HackerDuck HackerDuck - 3 months ago 20
Scala Question

How to split entries of RDD[(String,List[(String,String,String,String)])]

I need to apply some processing to an rdd with the following format:

RDD[(String,List[(String,String,String,String)])]


Here's are sample entries from the RDD:

(600,List((5,111,1,1), (15,111,1,5), (38,111,2,null))
(700,List((5,111,1,1), (35,111,1,5), (39,111,2,null))


I need to split each entry into several entries based on timestamp values which is found in the first element of each tuple in the List. Each entry should contain timestamps within the interval of 20 minutes.

For example, the first entry should be splitted into 2 entries:

List((5,111,1,1), (15,111,1,5))
List((38,111,2,null))


The final result should be
RDD[(String,List[(String,String,String,String)])]
:

(600,List((5,111,1,1), (15,111,1,5)))
(600,List((38,111,2,null)))
(700,List((5,111,1,1))
(700,List((35,111,1,5), (39,111,2,null))


Any hint how to do this and which functions to apply?

Answer

You can create a splitList function that splits a list from a given record per your desired behavior (not sure I followed it accurately, description is a bit ambiguous), and then use flatMap to "split" each key-value record into several records:

def doStuff() = {
  val input: RDD[(String,List[(String,String,String,String)])] = sc.parallelize(Seq(
    ("600",List(("5","111","1","1"), ("15","111","1","5"), ("38","111","2",null))),
    ("700",List(("5","111","1","1"), ("35","111","1","5"), ("39","111","2",null)))
  ))

  def splitList(l: List[(String,String,String,String)]): Iterable[List[(String,String,String,String)]] = {
    l.groupBy(_._1.toInt / 20).values // or any other logic
  }

  val result = input.flatMap { case (k, l) => splitList(l).map(sublist => (k, sublist)) }

  result.foreach(println)
  // prints: 
  // (600,List((38,111,2,null)))
  // (600,List((5,111,1,1), (15,111,1,5)))
  // (700,List((35,111,1,5), (39,111,2,null)))
  // (700,List((5,111,1,1)))
}