user3560220 user3560220 - 1 month ago 22
Scala Question

Pattern matching - spark scala RDD

I am new to Spark and Scala coming from R background.After a few transformations of RDD, I get a RDD of type

Description: RDD[(String, Int)]


Now I want to apply a Regular expression on the String RDD and extract substrings from the String and add just substring in a new coloumn.

Input Data :

BMW 1er Model,278
MINI Cooper Model,248


Output I am looking for :

Input | Brand | Series
BMW 1er Model,278, BMW , 1er
MINI Cooper Model ,248 MINI , Cooper


where Brand and Series are newly calculated substrings from String RDD

What I have done so far.

I could achieve this for a String using regular expression, but I cani apply fro all lines.

val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r //to look for BMW or MINI


Then I can use

brandRegEx.findFirstIn("hello this mini is bmW testing")


But how can I use it for all the lines of RDD and to apply different regular expression to achieve the output as above.

I read about this code snippet, but not sure how to put it altogether.

val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r

def getBrand(Col4: String) : String = Col4 match {
case brandRegEx(str) =>
case _ => ""
return 'substring
}


Any help would be appreciated !

Thanks

Answer

To apply your regex to each item in the RDD, you should use the RDD map function, which transforms each row in the RDD using some function (in this case, a Partial Function in order to extract to two parts of the tuple which makes up each row):

import org.apache.spark.{SparkContext, SparkConf}

object Example extends App {

  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example"))

  val data = Seq(
    ("BMW 1er Model",278),
    ("MINI Cooper Model",248))

  val dataRDD = sc.parallelize(data)

  val processedRDD = dataRDD.map{
    case (inString, inInt) =>
      val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
      val brand = brandRegEx.findFirstIn(inString)
      //val seriesRegEx = ...
      //val series = seriesRegEx.findFirstIn(inString)
      val series = "foo"
      (inString, inInt, brand, series)
  }

  processedRDD.collect().foreach(println)
  sc.stop()
}

Note that I think you have some problems in your regular expression, and you also need a regular expression for finding the series. This code outputs:

(BMW 1er Model,278,BMW,foo)
(MINI Cooper Model,248,NOT FOUND,foo)

But if you correct your regexes for your needs, this is how you can apply them to each row.

Comments