I am executing Spark job on cluster. While it runs perfectly in a local mode, in the cluster it fails in the following line:
val labelsAndPredictions = testData.map(p => (model.predict(p.features), p.label))
model
Utils
object Utils {
var model: GradientBoostedTreesModel = null
def loadModel(m: GradientBoostedTreesModel): Unit = {
model = m
}
def makePrediction(features: org.apache.spark.mllib.linalg.Vector): Double = {
println(model.trees)
val predictedLabel = model.predict(features)
println("predictedLabel: " + predictedLabel)
predictedLabel
}
}
model = GradientBoostedTrees.train(trainingData, boostingStrategy)
Utils.loadModel(model)
val labelsAndPredictions = testData.map { x =>
val prediction = Utils.makePrediction(x.features)
(x.label, prediction)
}
spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+AlwaysPreTouch" --class org.test.Runner s3://mybucket/keys/trainer.jar
The utils object won't solve the problem because the model is initialised static to the driver, not workers.
The last time I checked (1.6.x) models are indeed not serializable.
What you can do is:
model.predict(testData.map(_.features))
But that won't provide you a nice way to assess the model without labels.
Or
testData.toLocalIterator.map { x => (x.label, model.predict(x.features)) }
toLocalIterator iterates the data in each partition, but on the driver rather than workers, giving you access to your model. Unfortunately not parallelised though.