WahCheung WahCheung - 2 months ago 23
Scala Question

How to join two DataFrame, count rows of the same id and then sort by it

There are two DataFrames (Scala, Apache Spark 1.6.2)

1) plays

+---+
|vid|
+---+
| 1|
| 2|
| 1|
+---+


2) shows

+---+
|vid|
+---+
| 1|
| 1|
| 2|
| 2|
| 3|
| 3|
| 4|
+---+


The 'plays' DataFrame is the play records of videos and the 'shows' DataFrame is the show records of videos. How can I merge the two dataframes and get the total number of "show times" and "play time" of each vid, the result DataFrame should be sorted by playtimes in descending order, like this:

+---+---------+---------+
|vid|showtimes|playtimes|
+---+---------+---------+
| 1| 2| 2|
| 2| 2| 1|
| 3| 2| 0|
| 4| 1| 0|
+---+---------+---------+


I am new to Spark and have encounter many problems when doing this. The following prepared spark-shell command lines may help to construct the plays and shows tables:

scala> case class Play(vid: Int)

scala> case class Show(vid: Int)

scala> val plays = Seq(Play(1), Play(2), Play(1))

scala> val shows = Seq(Show(1), Show(1), Show(2), Show(2), Show(3), Show(3), Show(4))

scala> import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)

scala> val playsDF = sqlContext.createDataFrame(plays)

scala> val showsDF = sqlContext.createDataFrame(shows)

scala> playsDF.registerTempTable("plays")

scala> showsDF.registerTempTable("shows")

Answer Source

You can use the following solution

import org.apache.spark.sql.functions._

shows.groupBy("vid").agg(count("vid").as("showtimes"))
        .join(plays.groupBy("vid").agg(count("vid").as("playtimes")), Seq("vid"), "left")
        .na.fill(0)
        .show(false)

let me explain the parts

given dataframes

plays

+---+
|vid|
+---+
|1  |
|2  |
|1  |
+---+

and shows

+---+
|vid|
+---+
|1  |
|1  |
|2  |
|2  |
|3  |
|3  |
|4  |
+---+

the following code

shows.groupBy("vid").agg(count("vid").as("showtimes"))

will generate

+---+---------+
|vid|showtimes|
+---+---------+
|1  |2        |
|3  |2        |
|4  |1        |
|2  |2        |
+---+---------+

and the following code will generate

plays.groupBy("vid").agg(count("vid").as("playtimes"))
+---+---------+
|vid|playtimes|
+---+---------+
|1  |2        |
|2  |1        |
+---+---------+

and the rest of codes are just joining two dataframes with vid column and filling the null values with 0

I hope the answer is helpful