tobby tobby - 2 months ago 6
Scala Question

how to join two datasets by key in scala spark

I have two datasets and each dataset have two elements.
Below are examples.

Data1: (name, animal)

('abc,def', 'monkey(1)')
('df,gh', 'zebra')
...


Data2: (name, fruit)

('a,efg', 'apple')
('abc,def', 'banana(1)')
...


Results expected: (name, animal, fruit)

('abc,def', 'monkey(1)', 'banana(1)')
...


I want to join these two datasets by using first column 'name.' I have tried to do this for a couple of hours, but I couldn't figure out. Can anyone help me?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text1 = sc.textFile(args(0))
val text2 = sc.textFile(args(1))

val joined = text1.join(text2)


Above code is not working!

Answer

join is defined on RDDs of pairs, that is, RDDs of type RDD[(K,V)]. The first step needed is to transform the input data into the right type.

We first need to transform the original data of type String into pairs of (Key, Value):

val parse:String => (String, String) = s => {
  val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r
  s match {
    case regex(k,v) => (k,v)
    case _ => ("","")
  }
}

(Note that we can't use a simple split(",") expression because the key contains commas)

Then we use that function to parse the text input data:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")

val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)

val kvRdd1 = rdd1.map(parse)
val kvRdd2 = rdd2.map(parse)

Finally, we use the join method to join the two RDDs

val joined = kvRdd1.join(kvRdd2)

// Let's check out results

joined.collect

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))