Jeremy Jeremy - 2 months ago 39
Scala Question

spark udaf update array type of tuple

I'm using Scala + Spark 2.0 and trying to write an UDAF that has an Array of tuples as its internal buffer as well as its return type:
...

def bufferSchema = new StructType().add("midResults", ArrayType( StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType( StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )


And this is how I update the buffer

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}


But I get the following exception:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema


This pattern works if I have a simple Array of Double..

Answer

java.lang.ArrayStoreException is "thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects" and this expected because a local Scala type for StructType is o.a.s.sql.Row not a tuple. In other words you should use a Seq[Row] as a buffer field and Row as value.

Notes:

  • Calling ++ in a loop is probably not the best idea ever.
  • Creating an UDAF is slightly obsolete if you consider that since Spark 2.0 collect_list supports complex types.
  • Arguably Aggregators are way more user friendly than the UserDefinedAggregateFunctions.