Ramakrishna Ramakrishna - 1 month ago 15
Scala Question

Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

I am creating

avro
RDD
with following code.

def convert2Avro(data : String ,schema : Schema) : AvroKey[GenericRecord] = {
var wrapper = new AvroKey[GenericRecord]()
var record = new GenericData.Record(schema)
record.put("empname","John")
wrapper.datum(record)
return wrapper
}


and creating
avro RDD
as follows.

var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))


while executing, I am getting following exception in above line

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)


any pointer?

Answer

Schema.ReocrdSchema class is not implemented serializable. so it could not transferred over network.we can convert schema to string and pass to method and inside the method reconstruct the schema object.

var schemaString = schema.toString

 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))

Inside method reonstruct schema.

def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
   var schema = parser.parse(schemaString)
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }