duckertito duckertito - 4 months ago 21
Scala Question

Possible serialization error when executing Spark job from cluster with 1 master and 2 workers

I am executing Spark job on cluster. While it runs perfectly in a local mode, in the cluster it fails in the following line:

val labelsAndPredictions = => (model.predict(p.features), p.label))

I understand that the issue might be related to the fact that the object
is unaccessible for workers. Therefore I tried another approach. I created a serializable object
with two functions:

object Utils {

var model: GradientBoostedTreesModel = null

def loadModel(m: GradientBoostedTreesModel): Unit = {
model = m

def makePrediction(features: org.apache.spark.mllib.linalg.Vector): Double = {
val predictedLabel = model.predict(features)
println("predictedLabel: " + predictedLabel)


Then I substituted the prediction as follows:

model = GradientBoostedTrees.train(trainingData, boostingStrategy)

val labelsAndPredictions = { x =>
val prediction = Utils.makePrediction(x.features)
(x.label, prediction)

This is the command that I use to submit the Spark job:

spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+AlwaysPreTouch" --class org.test.Runner s3://mybucket/keys/trainer.jar


The utils object won't solve the problem because the model is initialised static to the driver, not workers.

The last time I checked (1.6.x) models are indeed not serializable.

What you can do is:


But that won't provide you a nice way to assess the model without labels.

Or { x => (x.label, model.predict(x.features)) }

toLocalIterator iterates the data in each partition, but on the driver rather than workers, giving you access to your model. Unfortunately not parallelised though.