Learner Learner - 2 months ago 8
Scala Question

Binding attribute, tree: dayofmonth(cast(timestamp#122 as date)) in Scala

I have a data frame df = [id: String, value: Int, type :String, timestamp: java.sql.Date] , I need the result as :

My data frame :

+----+-------++-------+------------------------+
| id | type | value | timestamp |
+----+-------+--------+------------------------+
| 1 | rent | 12 | 2016-09-19T00:00:00Z
| 1 | rent | 12 | 2016-09-19T00:00:00Z
| 1 | buy | 12 | 2016-09-20T00:00:00Z
| 1 | rent | 12 | 2016-09-20T00:00:00Z
| 1 | buy | 12 | 2016-09-18T00:00:00Z
| 1 | buy | 12 | 2016-09-18T00:00:00Z
+----+-------+-------+------------------------+


I need the result as

id : 1
totalValue : 72
typeForDay : {"rent: 2, "buy" : 2 } --- group By based on id and dayofmonth(col("timestamp")) atmost 1 type per day


I tried:

val ddf = df.
.groupBy("id", )
.agg(collect_set("type"),
sum("value") as "totalValue")

val count_by_value = udf {( gti :scala.collection.mutable.WrappedArray[String]) => if (gti == null) null else gti.groupBy(identity).mapValues(_.size)}


val result = ddf.withColumn("totalValue", count_by_value($"collect_list(type)"))
.drop("collect_list(type)")


This gives me error :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 115 in stage 15.0 failed 4 times, most recent failure: Lost task 115.3 in stage 15.0 (TID 1357, ip-172-31-9-47.ec2.internal): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: dayofmonth(cast(timestamp#122 as date))#137
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.<init>(Projection.scala:62)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:197)
at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:209)
at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Couldn't find dayofmonth(cast(timestamp#122 as date))#137 in [customerId#81,timestamp#122,benefit#111]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:92)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:86)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 34 more

Answer

Running your code (after a few fixes to make it compile...) doesn't produce the exception you describe on my env (using Spark 1.6.2), but it doesn't produce the desired result either - you can't try to count the number of days per type on ddf as ddf is already grouped by id only, and the timestamp data is lost.

Here's an alternative implementation, using a UDAF (User Defined Aggregation Function) to "merge" values of a MapType column into a single Map:

val toMap = udf { (typ: String, count: Int) => Map(typ -> count) }

val result = df
  // First: group by id AND type, count distinct days and sum value:
  .groupBy("id", "type").agg(countDistinct(dayofmonth(col("timestamp"))) as "daysPerType", sum("value") as "valPerType")
  // Then: convert type and count into a single Map column
  .withColumn("typeForDay", toMap(col("type"), col("daysPerType")))
  // Lastly: use a custom aggregation function to "merge" the maps (assuming keys are unique to begin with!)
  .groupBy("id").agg(sum("valPerType") as "totalValue", CombineMaps(col("typeForDay")) as "typeForDay")

result.show() 
// prints:
// +---+----------+------------------------+
// | id|totalValue|              typeForDay|
// +---+----------+------------------------+
// |  1|        72|Map(buy -> 2, rent -> 2)|
// +---+----------+------------------------+

And the implementation of CombineMaps:

object CombineMaps extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = new StructType().add("map", dataType)
  override def bufferSchema: StructType = inputSchema
  override def dataType: DataType = MapType(StringType, IntegerType)
  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[String, Int]())

  // naive implementation - assuming keys won't repeat, otherwise later value for key overrides earlier one
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val before = buffer.getAs[Map[String, Int]](0)
    val toAdd = input.getAs[Map[String, Int]](0)
    val result = before ++ toAdd
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)

  override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0)
}
Comments