Knight71 Knight71 - 11 months ago 69
Scala Question

What is causing the sort Function serializable in spark?

I am getting task not serializable for below code.
However if I pass the function directly I don't get any error.

abstract class MyAbstractClass[T, SortOrder](implicit ord: Ordering[SortOrder]) {
def getSorterFunc(): (T) => SortOrder
def sort(rdd: RDD[List[T]]) = {
val sortFunc = getSorterFunc() => x.sortBy(sortFunc))

object SampleObject extends MyAbstractClass[(String, Int, List[Int]), (String, Int)] {
def getSorterFunc() = {
case (username, id, addresses) => (username, id)

val data = Array(("user1",1,List(12,211)),("u2",1,List(12,211)),("u1",2,List(12,211))).toList
val dataList = Array(data,data)
val rdd = sc.parallelize(dataList)
// This is working fine => x.sortBy{ case (username, id, addresses) => (username, id) })
// This is giving task not serializable error.


org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

what is causing the error?

Answer Source

The culprit here is the implicit ord: Ordering[SortOrder] - since the sort method (called within an anonymous function that must be serialized by Spark) implicitly takes ord as argument, the encapsulating object SampleObject must be serializable.

This is why using the local variable sortFunc isn't enough - indeed this relieves Spark of the need to serialize the object on which getSorterFunc is called, but there's still ord to take care of.

As mentioned in comments, you can fix it by making SampleObject (or MyAbstractClass) extend Seriablizable.

Another option would be to move the implicit parameter from the class (which makes it a member, which means it "carries" the class along with it when serialized) to the method:

abstract class MyAbstractClass[T, SortOrder] {

  def getSorterFunc(): (T) => SortOrder

  def sort(rdd: RDD[List[T]])(implicit ord: Ordering[SortOrder]): RDD[List[T]] = {
    val sortFunc = getSorterFunc() => x.sortBy(sortFunc))

This works without having to serialize the class.