bigdataenthusiast bigdataenthusiast - 1 year ago 107
Scala Question

How to filter the data in spark-shell using scala?

I have the below data which needed to be sorted using spark(scala) in such a way that, I only need id of the person who visited "Walmart" but not "Bestbuy". store might be repetitive because a person can visit the store any number of times.

Input Data:

id, store

1, Walmart

1, Walmart

1, Bestbuy

2, Target

3, Walmart

4, Bestbuy

Output Expected:
3, Walmart

I have got the output using dataFrames and running SQL queries on spark context. But is there any way to do this using

etc without dataFrames. Can someone help me with the code, After map->
, a
has been formed and I am facing difficulty in filtering the

The code with which I got it using
is below:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

case class Person(id: Int, store: String)

val people = sc.textFile("examples/src/main/resources/people.txt")
.map(p => Person(p(1)trim.toInt, p(1)))

val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on and people.url='Walmart'")

The code which I am trying now is this, but I am struck after the third step:

val data = sc.textFile("examples/src/main/resources/people.txt")
.map(x=> (x.split(",")(0),x.split(",")(1)))
val dataGroup = data.groupByKey()
val dataFiltered ={case (x,y) =>
val url = y.flatMap(x=> x.split(",")).toList
if (!url.contains("Bestbuy") && url.contains("Walmart")){> (x,y))}}

if I do dataFiltered.collect(), I am getting
Array[Any] = Array(Vector((3,Walmart)), (), ())

Please help me how to extract the output after this step

Answer Source

To filter an RDD, just use RDD.filter:

val dataGroup = data.groupByKey()

val dataFiltered = dataGroup.filter {
  // keep only lists that contain Walmart but do not contain Bestbuy:
  case (x, y) => val l = y.toList; l.contains("Walmart") && !l.contains("Bestbuy")

dataFiltered.foreach(println) // prints: (3,CompactBuffer(Walmart))

// if you want to flatten this back to tuples of (id, store):
val result = dataFiltered.flatMap { case (id, stores) => => (id, store)) }

result.foreach(println) // prints: (3, Walmart)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download