tricky tricky - 6 months ago 104
Scala Question

Spark SQL Group By Consecutive Sequence of Integers

So I have a table which I want to create events from it. My user is watching a video which is defined as a list of sub_parts, with bytes downloaded for every sub_part asked from.

For example Alice is watching a video with 15 parts of 5 seconds, she watched the first three parts then she skipped to the part 7 and played two more parts, but in the end she never finished the video.

So I want to recreate this trace of events for every user with Spark SQL (and most likely UDF but help me with this, I don't understand how can I make it work)

+---+------------+-------------+-------------+
| | Name | Video_part | Bytes Dl |
+---+------------+-------------+-------------+
| 1 | Alice | 1 | 200 |
| 2 | Alice | 2 | 250 |
| 3 | Alice | 3 | 400 |
| 1 | Alice | 7 | 100 |
| 2 | Alice | 8 | 200 |
| 3 | Bob | 1 | 1000 |
| 1 | Bob | 32 | 500 |
| 2 | Bob | 33 | 400 |
| 3 | Bob | 34 | 330 |
| 1 | Bob | 15 | 800 |
| 2 | Bob | 16 | 400 |
+---+------------+-------------+-------------+


So what I want is to group by consecutive integers in video_part which are my events play, and when there is a break in this consecutive list, this is either an event skin_in or skip_out, for each portion of play I wanna get the mean of the bytes downloaded too :

+---+------------+-------------+-------------+-------------+-------------+
| | Name | Number_play | Event | Number_skips| Mean_BytesDL|
+---+------------+-------------+-------------+-------------+-------------+
| 1 | Alice | 3 | Play | 0 | 283,3 |
| 2 | Alice | 0 | Skip_in | 4 | 0 |
| 3 | Alice | 2 | Play | 0 | 150 |
| 1 | Bob | 1 | Play | 0 | 1000 |
| 2 | Bob | 0 | Skip_in | 31 | 0 |
| 3 | Bob | 3 | Play | 0 | 410 |
| 2 | Bob | 0 | Skip_out | 19 | 0 |
| 3 | Bob | 2 | Play | 0 | 600 |
+---+------------+-------------+-------------+-------------+-------------+


Problem is I can do it in Python or in Scala using respectively sub_pandas df with loops or sublists with map and foreach, but it is taking too long to run it on 1 To of datas. Even if I run it on my cluster of nodes.

So I'm wondering is there a way to do it in Spark SQL, I have researched a little over UDF with Groupby, flatMap or Agg. But I'm having trouble as this is completely new for me, hope you can help me somehow !

I was thinking with something like :


  • SortBy Name

  • Through every unique name :

  • Aggregate the video_part with UDF -> which creates three new columns
    with one which the avg of the bytesDL on the portion



I know this is pretty specific but maybe someone can help me,

Thanks in advance and have a good day !

Answer Source

Using UDF function would give you row by row calculation functionality with columns you pass to UDF function, and it would be difficult to fulfill your criteria with that.
I suggest you to use Window function, in which you can define grouping , ordering and even framing types.

PARTITION BY ... ORDER BY ... frame_type BETWEEN start AND end

databricks and Mastering Apache Spark 2 should be sufficient enough to start with.
What I can suggest more is for the first phase of calculating Mean_BytesDL , in which you can

Window.partitionBy(col("name")).orderBy(col("Video_part").asc).rowsBetween(<choose rows so that each frame would contian all the consecutive Video_part played>)

you can proceed the same for other columns and drop all unnecessary rows.

Working on a custom frame_type in not impossible but is certainly a nightmare.
Meanwhile I got you the solution by using UDAF but before that please make sure there is another column which identifies the latest download for a user

+---+-----+----------+--------+------+
|sn |Name |Video_part|Bytes D1|latest|
+---+-----+----------+--------+------+
|1  |Alice|1         |200     |      |
|2  |Alice|2         |250     |      |
|3  |Alice|3         |400     |      |
|1  |Alice|7         |100     |      |
|2  |Alice|8         |200     |latest|
|3  |Bob  |1         |1000    |      |
|1  |Bob  |32        |500     |      |
|2  |Bob  |33        |400     |      |
|3  |Bob  |34        |330     |      |
|1  |Bob  |15        |800     |      |
|2  |Bob  |16        |400     |latest|
+---+-----+----------+--------+------+

After that create UDAF as below

private class MovieAggregateFunction(inputSourceSchema : StructType) extends UserDefinedAggregateFunction {
  var previousPlay : Int = _
  var previousEvent : String = _
  var playCount : Int = _
  var skipCount : Int = _
  var sum : Double = _
  var finalString : String = _
  var first : Boolean = _

  def inputSchema: StructType = inputSourceSchema

  def bufferSchema: StructType = new StructType().add("finalOutput", StringType)

  def dataType: DataType = StringType

  def deterministic: Boolean = false

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    previousPlay = 0
    previousEvent = "Play"
    playCount = 0
    skipCount = 0
    sum = 0.0
    finalString = ""
    first = true
    buffer.update(0,"")
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val sn = input.getInt(0)
    val name = input.getString(1)
    val vPart = input.getInt(2)
    val eventType = getEventType(previousPlay, vPart)
    val dPart = input.getInt(3).toDouble
    val latest = input.getString(4)
    if(previousEvent.equalsIgnoreCase(eventType) && eventType.equalsIgnoreCase("Play")){
      playCount +=1
      sum += dPart
    }
    if(!previousEvent.equalsIgnoreCase(eventType)){
      if(first) {
        finalString = name + "::" + playCount + "::" + previousEvent + "::" + "0" + "::" + sum / playCount + "&&" +
          name + "::" + "0" + "::" + eventType + "::" + skipCount + "::" + "0"
      }
      else{
        finalString = finalString+"&&"+name + "::" + playCount + "::" + previousEvent + "::" + "0" + "::" + sum / playCount +
          "&&" + name + "::" + "0" + "::" + eventType + "::" + skipCount + "::" + "0"
      }
      playCount = 1
      sum = 0
      sum += dPart
      previousEvent = "Play"
      first = false
    }
    if(latest.equalsIgnoreCase("latest")){
      finalString = finalString+"&&"++name+"::"+playCount+"::"+previousEvent+"::"+skipCount+"::"+sum/playCount
    }
    previousPlay = vPart
    buffer.update(0, finalString)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getString(0) + buffer2.getString(0))
  }

  def evaluate(buffer: Row): Any = {
    buffer.getString(0)
  }

  def getEventType(firstPlay: Int, secondPlay: Int): String ={
    if(firstPlay < secondPlay && secondPlay - firstPlay == 1){
      skipCount = 0
      "Play"
    }
    else if(firstPlay < secondPlay && secondPlay-firstPlay > 1){
      skipCount = secondPlay - firstPlay
      "Skip_in"
    }
    else if(firstPlay > secondPlay){
      skipCount = firstPlay - secondPlay
      "Skip_out"
    }
    else
      ""
  }
}

Then call the UDAF by passing inputSchema and apply aggregation function

val udaf = new MovieAggregateFunction(df.schema)
df = df.groupBy("Name").agg(udaf(col("sn"), col("Name"), col("Video_part"), col("Bytes D1"), col("latest")).as("aggOut"))

The output until now is

+-----+------------------------------------------------------------------------------------------------------------------------+
|Name |aggOut                                                                                                                  |
+-----+------------------------------------------------------------------------------------------------------------------------+
|Bob  |Bob::1::Play::0::1000.0&&Bob::0::Skip_in::31::0&&Bob::3::Play::0::410.0&&Bob::0::Skip_out::19::0&&Bob::2::Play::0::600.0|
|Alice|Alice::3::Play::0::283.3333333333333&&Alice::0::Skip_in::4::0&&Alice::2::Play::0::150.0                                 |
+-----+------------------------------------------------------------------------------------------------------------------------+

We already have the desired output. Now to convert the aggOut column into a separate dataFrame, convert it to rdd, split them and convert back to dataFrame as below

val lineRdd = df.rdd.flatMap(row => row(1).toString.split("&&").toList)
val valueRdd = lineRdd.map(line => {
  val list = mutable.MutableList[String]()
  for(value <- line.split("::")){
    list += value
  }
  Row.fromSeq(list)
  })
val outputFields = Vector("Name", "Number_play", "Event", "Number_skips", "Mean_bytesDL")
val schema = StructType(outputFields.map(field => StructField(field, DataTypes.StringType, true)))
df = sqlContext.createDataFrame(valueRdd, schema)
df.show(false)

The final output is

+-----+-----------+--------+------------+-----------------+
|Name |Number_play|Event   |Number_skips|Mean_bytesDL     |
+-----+-----------+--------+------------+-----------------+
|Bob  |1          |Play    |0           |1000.0           |
|Bob  |0          |Skip_in |31          |0                |
|Bob  |3          |Play    |0           |410.0            |
|Bob  |0          |Skip_out|19          |0                |
|Bob  |2          |Play    |0           |600.0            |
|Alice|3          |Play    |0           |283.3333333333333|
|Alice|0          |Skip_in |4           |0                |
|Alice|2          |Play    |0           |150.0            |
+-----+-----------+--------+------------+-----------------+

Note : the final dataTypes are all String, you can change them according to your need.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download