GangsterMaur GangsterMaur - 3 months ago 8
Scala Question

How to create Map using multiple lists in Spark

I am trying to figure out how to access particular elements from RDD

myRDD
:

(600,List((600,111,7,1), (615,111,3,5))
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))


I want to extract some data from Redis DB using 3-rd field from sub-lists as ID. For example, in case of ´(600,List((600,111,1,1), (615,111,1,5))´ the ID's are
7
and
3
.
In case of
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))
, the ID's are
2
,
3
and
9
.

The problem is that I don't know how to collect values using multiple ID's. In the below-given code, I use
line._2(3)
, but it's not correct, because this way I access sub-lists, not the fields inside these sublists.
Should I use
flatMap
or similar?

val newRDD = myRDD.mapPartitions(iter => {
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
iter.map({line => (line._1,
redisPool.withJedisClient { client =>
val start_date: String = Dress.up(client).hget("id:"+line._2(3),"start_date")
val end_date: String = Dress.up(client).hget("id:"+line._2(3),"end_date")
val additionalData = List((start_date,end_date))
Map(("base_data", line._2), ("additional_data", additionalData))
})
})
})
newRDD.collect().foreach(println)


If we assume that Redis DB contains some relevant data, then the result
newRDD
could be the following:

(600,Map("base_data" -> List((600,111,7,1), (615,111,3,5)), "additional_data" -> List((2014,2015),(2015,2016)))
(601,Map("base_data" -> List((622,112,2,1), (615,111,3,5), (456,111,9,12)), "additional_data" -> List((2010,2015),(2011,2016),(2014,2016)))

Answer

To get a list of third elements of each tuple in line._2, use line._2.map(_._3) (assuming the type of line is (Int, List[(Int, Int, Int, Int)]), like it looks from your example, and types like Any aren't involved). Overall, it seems like your code should look like

iter.map({ case (first, second) => (first,
  redisPool.withJedisClient { client =>
    val additionalData = second.map { tuple =>
      val start_date: String = Dress.up(client).hget("id:"+tuple._3,"start_date")
      val end_date: String = Dress.up(client).hget("id:"+tuple._3,"end_date")
      (start_date, end_date)
    }
    Map(("base_data", second), ("additional_data", additionalData))
  })
})