duckertito duckertito - 21 days ago 4
Scala Question

Sum up the values of the DataFrame based on conditions

I have a DataFrame that is created as follows:

df = sc
.textFile("s3n://bucket/key/data.txt")
.map(_.split(","))
.toDF()


This is the content of
data.txt
:

123,2016-11-09,1
124,2016-11-09,2
123,2016-11-10,1
123,2016-11-11,1
123,2016-11-12,1
124,2016-11-13,1
124,2016-11-14,1


Is it possible to filter
df
in order to get the sum of 3rd column values for
123
for the last N days starting from now? I am interested in a flexible solution so that N could be defined as a parameter.

For example, if today would be
2016-11-16
and
N
would be equal to 5, then the sum of 3rd column values for
124
would be equal to
2
.

EDIT:

This is my current solution:

df = sc
.textFile("s3n://bucket/key/data.txt")
.map(_.split(","))
.toDF(["key","date","qty"])

val starting_date = LocalDate.now().minusDays(x_last_days)
df.filter(col("key") === "124")
.filter(to_date(df("date")).gt(starting_date))
.agg(sum(col("qty")))


but it does not seem to work properly. 1. The line where I define column names
["key","date","qty"]
does not compile for Scala 2.10.6 and Spark 1.6.2. 2. Also it returns a dataframe, while I need
Int
. Should I just do
toString.toInt
?

Answer

Both of the following won't compile :

scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
// <console>:1: error: illegal start of simple expression
//       val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF(["key","date","qty"])
                                                                                                                                                                                             ^

scala> val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
// <console>:27: error: value toDF is not a member of org.apache.spark.rdd.RDD[Array[String]]
//       val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1")).map(_.split(",")).toDF
                                                                                                                                                                                          ^

The first one because it's a correct syntax and the second is because like the error says it's not a member, in other terms, it's not supported.

The later one will compile with Spark 2.x but the following solution would also apply or you'll have a DataFrame with one column of type ArrayType.

Now let's solve the issue :

scala> :pa
// Entering paste mode (ctrl-D to finish)
import sqlContext.implicits._ // you don't need to import this in the shell.
val df = sc.parallelize(Seq("123,2016-11-09,1","124,2016-11-09,2","123,2016-11-10,1","123,2016-11-11,1","123,2016-11-12,1","124,2016-11-13,1","124,2016-11-14,1"))
           .map{ _.split(",") match { case Array(a,b,c) => (a,b,c) }}.toDF("key","date","qty")

// Exiting paste mode, now interpreting.

// df: org.apache.spark.sql.DataFrame = [key: string, date: string, qty: string]

You can apply any filter you want and compute the aggregation needed, e.g :

scala> val df2 = df.filter(col("key") === "124").agg(sum(col("qty")))
// df2: org.apache.spark.sql.DataFrame = [sum(qty): double]

scala> df2.show
// +--------+                                                                      
// |sum(qty)|
// +--------+
// |     4.0|
// +--------+