Shams Tabraiz Alam Shams Tabraiz Alam - 11 months ago 53
Scala Question

Filtering RDD and List in Scala

I have an RDD with following structure

RDD[((String), List[(Int,String)])]

And it contain this data

((John),List((4,00A), (5,00A), (15,00B), (15,00C)))
((Root),List((1,00A), (2,00B), (3,00C)))
((Marsh),List((2,00B), (3,00C)))

Now i want to filter it by following rules

1 : If list does not contain '00A' then do not return it

2 : If list contain '00A' then return all '00A' items and also '00C' items in the list. So result should look like this.

((John),List((4,00A), (5,00A), (15,00C)))
((Root),List((1,00A), (3,00C)))

EDIT: Adding code posted in comment.

I have tried this:

val rdd = df
.map{case Row(id: Int, name: String, code: String) => ((name), List((id, code)))}
.reduceByKey(_ ++ _);

val r = rdd
.map{case (t, list) => {
val tempList ={case (id, code) => (id, code)}
val newList ={case (id, "00A") => (id, "00A")
case (id, "00C") => (id, "00C")
case (id, code) => List.empty }

(t, newList)}

Answer Source

It's straightforward

It's not really an RDD/Spark question so I've done it with Lists. If you were using lists, you could do the filter/map as one collect but collect means something quite different for RDD

Assuming your data was really meant to look like this:

 val xs = List(("John",List((4,"00A"), (5,"00A"), (15,"00B"), (15,"00C"))),
            ("Root",List((1,"00A"), (2,"00B"), (3,"00C"))),
            ("Marsh", List((2,"00B"), (3,"00C"))))

Then first we filter to get only the records with "00A" somewhere.

 val filtered = xs.filter{case (key, ys) =>ys.exists(y=>y._2 == "00A")}

Them map over tte result to return only the "00A" and "00C" ones

 val result ={case (key, ys) => 
                             (key, ys.filter(y=>y._2 == "00A" || y._2 == "00C"))}
 //> result  : List[(String, List[(Int, String)])] = List(
 // (John,List((4,00A), (5, 00A), (15,00C))),
 // (Billing,List((7,00A))), 
 // (Root,List((1,00A), (3,00C))))