duckertito duckertito - 17 days ago 5
Scala Question

Possible serialization error when executing Spark job from cluster with 1 master and 2 workers

I am executing Spark job on cluster. While it runs perfectly in a local mode, in the cluster it fails in the following line:

val labelsAndPredictions = testData.map(p => (model.predict(p.features), p.label))


I understand that the issue might be related to the fact that the object
model
is unaccessible for workers. Therefore I tried another approach. I created a serializable object
Utils
with two functions:

object Utils {

var model: GradientBoostedTreesModel = null

def loadModel(m: GradientBoostedTreesModel): Unit = {
model = m
}

def makePrediction(features: org.apache.spark.mllib.linalg.Vector): Double = {
println(model.trees)
val predictedLabel = model.predict(features)
println("predictedLabel: " + predictedLabel)
predictedLabel
}

}


Then I substituted the prediction as follows:

model = GradientBoostedTrees.train(trainingData, boostingStrategy)
Utils.loadModel(model)

val labelsAndPredictions = testData.map { x =>
val prediction = Utils.makePrediction(x.features)
(x.label, prediction)
}


This is the command that I use to submit the Spark job:

spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+AlwaysPreTouch" --class org.test.Runner s3://mybucket/keys/trainer.jar

Answer

The utils object won't solve the problem because the model is initialised static to the driver, not workers.

The last time I checked (1.6.x) models are indeed not serializable.

What you can do is:

model.predict(testData.map(_.features))

But that won't provide you a nice way to assess the model without labels.

Or

testData.toLocalIterator.map { x => (x.label, model.predict(x.features)) }

toLocalIterator iterates the data in each partition, but on the driver rather than workers, giving you access to your model. Unfortunately not parallelised though.

Comments