bigdataenthusiast bigdataenthusiast - 3 months ago 22
Scala Question

How to filter the data in spark-shell using scala?

I have the below data which needed to be sorted using spark(scala) in such a way that, I only need id of the person who visited "Walmart" but not "Bestbuy". store might be repetitive because a person can visit the store any number of times.

Input Data:

id, store

1, Walmart

1, Walmart

1, Bestbuy

2, Target

3, Walmart

4, Bestbuy

Output Expected:
3, Walmart

I have got the output using dataFrames and running SQL queries on spark context. But is there any way to do this using

groupByKey
/
reduceByKey
etc without dataFrames. Can someone help me with the code, After map->
groupByKey
, a
ShuffleRDD
has been formed and I am facing difficulty in filtering the
CompactBuffer
!

The code with which I got it using
sqlContext
is below:


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

case class Person(id: Int, store: String)

val people = sc.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(p => Person(p(1)trim.toInt, p(1)))
people.registerTempTable("people")

val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on people.id=sample.id and people.url='Walmart'")


The code which I am trying now is this, but I am struck after the third step:

val data = sc.textFile("examples/src/main/resources/people.txt")
.map(x=> (x.split(",")(0),x.split(",")(1)))
.filter(!_.filter("id"))
val dataGroup = data.groupByKey()
val dataFiltered = dataGroup.map{case (x,y) =>
val url = y.flatMap(x=> x.split(",")).toList
if (!url.contains("Bestbuy") && url.contains("Walmart")){
x.map(x=> (x,y))}}


if I do dataFiltered.collect(), I am getting
Array[Any] = Array(Vector((3,Walmart)), (), ())

Please help me how to extract the output after this step

Answer

To filter an RDD, just use RDD.filter:

val dataGroup = data.groupByKey()

val dataFiltered = dataGroup.filter {
  // keep only lists that contain Walmart but do not contain Bestbuy:
  case (x, y) => val l = y.toList; l.contains("Walmart") && !l.contains("Bestbuy")
}

dataFiltered.foreach(println) // prints: (3,CompactBuffer(Walmart))

// if you want to flatten this back to tuples of (id, store):
val result = dataFiltered.flatMap { case (id, stores) => stores.map(store => (id, store)) }

result.foreach(println) // prints: (3, Walmart)