khrystal khrystal - 1 month ago 8
Scala Question

concatenating maptype values while doing groupby on dataframe

I have this dataframe which contains 3 columns -> userId, date, generation

+-------+--------+----------------------------------------------------------------------------+
|userId | date |generation |
+-------+--------+----------------------------------------------------------------------------+
|1 |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551)|
|1 |20160926|Map("screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419) |
|1 |20160926|Map("screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435) |
+-------+--------+----------------------------------------------------------------------------+


I want to group these values on the basis of userId and date
but the problem is with the 3rd column which contains value of maptype and the requirement is to combine all the maptype values in one column, final output should look like this->




+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId |date |generation |
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |20160926|Map("screen_WiFi" -> 15.127, "upload_WiFi" -> 0.603, "total_WiFi" -> 19.551,"screen_2g" -> 0.573, "upload_2g" -> 0.466, "total_2g" -> 1.419, "screen_3g" -> 10.084, "upload_3g" -> 80.515, "total_3g" -> 175.435)|
+-------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


Is there any way to solve this problem , or any possible workaround??

Answer

You can create a naive User Defined Aggregation Function (UDAF) that combines maps, and then use it as the aggregation function. Since you didn't define how to combine two values in the map for two identical keys, I will assume that keys are unique, i.e. for each userId and date, no key would appear in two different records:

/***
  * UDAF combining maps, overriding any duplicate key with "latest" value
  * @param keyType DataType of Map key 
  * @param valueType DataType of Value key
  * @tparam K key type
  * @tparam V value type
  */
class CombineMaps[K, V](keyType: DataType, valueType: DataType) extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = new StructType().add("map", dataType)
  override def bufferSchema: StructType = inputSchema
  override def dataType: DataType = MapType(keyType, valueType)
  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[K, V]())

  // naive implementation - assuming keys won't repeat, otherwise later value for key overrides earlier one
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val before = buffer.getAs[Map[K, V]](0)
    val toAdd = input.getAs[Map[K, V]](0)
    val result = before ++ toAdd
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)

  override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0)
}

// instantiate a CombineMaps with the relevant types:
val combineMaps = new CombineMaps[String, Double](StringType, DoubleType)

// groupBy and aggregate
val result = input.groupBy("userId", "date").agg(combineMaps(col("generation")))
Comments