tobby tobby - 1 year ago 75
Scala Question

how to join two datasets by key in scala spark

I have two datasets and each dataset have two elements.
Below are examples.

Data1: (name, animal)

('abc,def', 'monkey(1)')
('df,gh', 'zebra')

Data2: (name, fruit)

('a,efg', 'apple')
('abc,def', 'banana(1)')

Results expected: (name, animal, fruit)

('abc,def', 'monkey(1)', 'banana(1)')

I want to join these two datasets by using first column 'name.' I have tried to do this for a couple of hours, but I couldn't figure out. Can anyone help me?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text1 = sc.textFile(args(0))
val text2 = sc.textFile(args(1))

val joined = text1.join(text2)

Above code is not working!

Answer Source

join is defined on RDDs of pairs, that is, RDDs of type RDD[(K,V)]. The first step needed is to transform the input data into the right type.

We first need to transform the original data of type String into pairs of (Key, Value):

val parse:String => (String, String) = s => {
  val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r
  s match {
    case regex(k,v) => (k,v)
    case _ => ("","")

(Note that we can't use a simple split(",") expression because the key contains commas)

Then we use that function to parse the text input data:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")

val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)

val kvRdd1 =
val kvRdd2 =

Finally, we use the join method to join the two RDDs

val joined = kvRdd1.join(kvRdd2)

// Let's check out results


// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download