Shams Tabraiz Alam Shams Tabraiz Alam - 3 months ago 8
Scala Question

Mapping RDD to case(Schema) in Spark with Scala

I am new to scala and spark. I have a little problem. I have an RDD with following schema.

RDD[((String, String), (Int, Timestamp, String, Int))]


and I have to map this RDD to transform it like this

RDD[(Int, String, String, String, Timestamp, Int)]


and I write following code for this

map { case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level) }


this work fine. Now I have another RDD

RDD[((String, String), List[(Int, Timestamp, String, Int)])]


and I want to transform it like this as above

RDD[(Int, String, String, String, Timestamp, Int)]


How can I do that I have tried this code but it does not work

map {
case ((pid, name), List(id, date, code, level)) => (id, name, code, pid, date, level)
}


How it can be achieved?

Answer

Is this the thing you're looking for?

val input: RDD[((String, String), List[(Int, Timestamp, String, Int)])] = ...
val output: RDD[(Int, String, String, String, Timestamp, Int)] = input.flatMap { case ((pid, name), list) =>
  list.map { case (id, date, code, level) =>
    (id, name, code, pid, date, level)
  }
}

or using for comprehension:

val output: RDD[(Int, String, String, String, Timestamp, Int)] = for {
  ((pid, name), list)     <- input
  (id, date, code, level) <- list
} yield (id, name, code, pid, date, level)