DSpark DSpark - 1 month ago 22
Scala Question

Custom kryoSerialization flow not working in scala spark

I am new to scala.
I am trying to implement Custom kryo serialization.

In which I have two classes and one object :

Operation

package org.agg
object Operation {
def main(args: Array[String]) {
var SparkConf = new SparkConf()
.setAppName("Operation")
.set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.registrator", "org.agg.KryoClass")

var sc = new SparkContext(SparkConf)
var sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
println("**********Operation***********")
}
}


KryoClass

package org.agg
class KryoClass extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
println("**********KryoClass***********")
kryo.register(classOf[org.agg.KryoSerializeCode])
}
}


KryoSerializeCode

package org.agg
class KryoSerializeCode {
println("**********KryoSerializeCode*************")
}


I am considering that In Operation Class If I am writing set("spark.kryo.registrator", "org.agg.KryoClass")
So this should call KryoClass and It will print println("**********KryoClass***********") statement in log file.

Command to execute Operation object is below:


spark-submit --class org.agg.Operation --master yarn --deploy-mode
cluster --num-executors 40 --executor-cores 1 --executor-memory 3400m
--files /home/hive-site.xml --jars /usr/iop/4.1.0.0/spark/lib/datanucleus-api-jdo-3.2.6.jar,/usr/iop/4.1.0.0/spark/lib/datanucleus-rdbms-3.2.9.jar,/usr/iop/4.1.0.0/spark/lib/datanucleus-core-3.2.10.jar
/home/operation_jar.jar


But After executing this it is printing print statement in Operation class only not in KryoClass or KryoSerializeCode Class.

Do anyone having idea why it is not calling print statement inside KryoClass or KryoSerializeCode Class.

Answer

Spark has the habit of doing things lazily - delaying unnecessary operations until they are actually needed. Seems like the creation of the Kryo serializer falls into that category - since you didn't actually do anything with your newly created SparkContext, Spark didn't bother creating the serializer.

If you add any Spark action to your Operation.main, e.g.:

sc.parallelize(List(1,2,3)).count()

You'll see the printouts you're expecting.

NOTE: you might have to register a few more classes, or disable registrationRequired