Knows Not Much Knows Not Much - 3 months ago 21
Scala Question

Cassandra Spark Connector and filtering data

I am using Spark 1.3.1 and I have written a small program to filter data on cassandra

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()


This program runs for a very long time, printing messages like

16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)


If I terminate the program and change my code to

val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))


It still runs for a very long time with messages like

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)


So it seems like the program will always try to load the entire cassandra table in memory (or try to scan it completely) and only then apply the filter. Which seems extremely inefficient to me.

How can I write this code in a better way so that spark doesn't try to load the entire cassandra table (or scan it completely) into an RDD and only then apply filter?

Answer

Your first piece of code

val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusDays(30)
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD

So be careful. RDDs are immutable so when you apply a filter you need to use the returned RDD and not the one you applied the function to.


val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusDays(30)
rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD
println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything

For more efficency on reading from Cassandra:

If your date column is a clustering key you can use the .where function to push down the predicate to Cassandra. Other than that there isn't much you can do to prune data server side.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where

Comments