THIS USER NEEDS HELP THIS USER NEEDS HELP - 2 months ago 21
Java Question

NotSerializableException for Jackson ObjectNode in Spark closure issue

Say I have the following Java Object that maps to Jackson full data binding:

public class Student implements Serializable{
private ObjectNode name; // two keys: "first_name", "last_name"

// getter and setter ...
}


And I have a following Spark code that attempts to serialize the closure variable
student
of type
Student
due to different scopes.

class A(student : Student) extends Serializable {
def process(input: DataFrame): Unit = {
val test = input.map { a =>
print(student)
}
}
}


which gives following error:
Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode


I understand why I am getting such error. Basically, Spark will attempt to serialize all out-of-scope variables a.k.a. closures and pass it to each executors. But since the ObjectNode itself is not
Serializable
, executor cannot get the
Student
instances.

My question is, what are the ways I can solve this?

I have tried using
Map<String, String>
instead of
ObjectNode
, but since
ObjectNode
's
put
and
set
can only have "primitives" and
JsonNode
as value, it causes error when I try something like this :

ObjectNode meta_info = JsonNodeFactory.instance.objectNode();
meta_info.set("field name", student.getName());

Answer

There is several options.

If you need Object node only for json serialization purpose then you can rewrite your Student class and completely remove ObjectNode. In your example you can subsitute it by object with firstName and lastName fields

class Name implements Serializable {
    String firstName;
    String lastName;
}

However if this is not possible you can do custom serialization for like this

public class Student implements Serializable {
    private transient ObjectNode name;

    private void writeObject(ObjectOutputStream out) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        out.writeUTF(mapper.writeValueAsString(name));
        // other fields here
    }

    private void readObject(ObjectInputStream in) throws IOException,
            ClassNotFoundException {
        ObjectMapper mapper = new ObjectMapper();

        JsonNode node = mapper.readTree(in.readUTF());
        if (!node.isObject()) {
            throw new IOException("malformed name field detected");
        }

        name = (ObjectNode) node;

        // read other fields
    }
}

In my example I serialized object node to json string, but you of course can iterate over object node fields store each field separately.

You can read more about custom serialization in ObjectOutputStream javadoc.

Also you can experiment with different data serializers like Kryo.

Comments