user3663622 user3663622 - 1 month ago 15
Scala Question

Intersection of Two Map rdd's in Scala

I have two RDD's, for example:
firstmapRDD - (0-14,List(0, 4, 19, 19079, 42697, 444, 42748))

secondmapRdd-(0-14,List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94))

I want to find the intersection.
I tried, var interResult = firstmapRDD.intersection(secondmapRdd), which shows no result in output file.

I also tried , cogrouping based on keys, mapRDD.cogroup(secondMapRDD).filter(x=>), but I don't know how to find the intersection between both the values, is it x=>x._1.intersect(x._2), Can someone help me with the syntax?

Even this throws a compile time error, mapRDD.cogroup(secondMapRDD).filter(x=>x._1.intersect(x._2))

var mapRDD = sc.parallelize(map.toList)
var secondMapRDD = sc.parallelize(secondMap.toList)
var interResult = mapRDD.intersection(secondMapRDD)


It may be because of ArrayBuffer[List[]] values, because of which the intersection is not working. Is there any hack to remove it?

I tried doing this

var interResult = mapRDD.cogroup(secondMapRDD).filter{case (_, (l,r)) => l.nonEmpty && r.nonEmpty }. map{case (k,(l,r)) => (k, l.toList.intersect(r.toList))}


Still getting an empty list!

Answer

Since you are looking intersect on values, you need to join both RDDs, get all the matched values, then do the intersect on values.

sample code:

val firstMap = Map(1 -> List(1,2,3,4,5))
  val secondMap = Map(1 -> List(1,2,5))

  val firstKeyRDD = sparkContext.parallelize(firstMap.toList, 2)
  val secondKeyRDD = sparkContext.parallelize(secondMap.toList, 2)

  val joinedRDD = firstKeyRDD.join(secondKeyRDD)
  val finalResult = joinedRDD.map(tuple => {
    val matchedLists = tuple._2
    val intersectValues = matchedLists._1.intersect(matchedLists._2)
    (tuple._1, intersectValues)
  })

  finalResult.foreach(println)

The output will be

(1,List(1, 2, 5))