D.Asare D.Asare - 1 month ago 14
Java Question

newAPIHadoopRDD Task not serializable

I keep getting Task not serializable error. I'm using mongo-hadoop connector inside a Java Spark application.

The error is shown below

16/10/12 17:43:34 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at DataframeExample.java:47
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.map(RDD.scala:313)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47)
at com.hbfinance.DataframeExample.run(DataframeExample.java:54)
at com.hbfinance.DataframeExample.main(DataframeExample.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.hbfinance.DataframeExample
Serialization stack:
- object not serializable (class: com.hbfinance.DataframeExample, value: com.hbfinance.DataframeExample@1f3165e7)
- field (class: com.hbfinance.DataframeExample$1, name: this$0, type: class com.hbfinance.DataframeExample)
- object (class com.hbfinance.DataframeExample$1, com.hbfinance.DataframeExample$1@1866da85)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 22 more


Here is my code:

package com.hbfinance;

public class DataframeExample {

public void run() {
JavaSparkContext sc = new JavaSparkContext(new SparkConf());
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");

// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
"mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs");
// mongodbConfig.set("mongo.input.uri",
// "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs");
// mongodbConfig.set("mongo.input.uri",
// "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs");
// mongodbConfig.set("mongo.input.uri",
// "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs");
// mongodbConfig.set("mongo.input.uri",
// "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs");

// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);

JavaRDD<AppLog> logs = documents.map(

new Function<Tuple2<Object, BSONObject>, AppLog>() {

public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2().get("headers");

log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));

return log;
}
}
);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
logsSchema.registerTempTable("logs");

DataFrame groupedMessages = sqlContext.sql(
"select target, action, Count(*) from logs group by target, action");
// "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");



groupedMessages.show();

logsSchema.printSchema();
}

public static void main(final String[] args) {
new DataframeExample().run();
}

}

Answer

Your class, DataframeExample, must be serializable. Add implements Serializable and it will work.

Why?

Your anonymous class, created in run () function, have a pointer to outer class -so DataframeExample. Spark must serialize those anonymous classes, so it try also to serialize outer class, but without success, as the class don't implement Serializable.

Comments