Rahul Sharma Rahul Sharma - 3 months ago 12
Java Question

Spark ML- failing to load model using MatrixFactorizationModel

I am trying to implement recommender system using Spark collaborative filtering.

First I prepare model and save to disk:

MatrixFactorizationModel model = trainModel(inputDataRdd);
model.save(jsc.sc(), "/op/tc/model/");


When I load model using separate process the program fails with below exception:

Code:

static JavaSparkContext jsc ;
private static Options options;
static{
SparkConf conf = new SparkConf().setAppName("TC recommender application");
conf.set("spark.driver.allowMultipleContexts", "true");
jsc= new JavaSparkContext(conf);
}
MatrixFactorizationModel model = MatrixFactorizationModel.load(jsc.sc(),
"/op/tc/model/");


Exception:


Exception in thread "main" java.io.IOException: Not a file:
maprfs:/op/tc/model/data
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1114)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1107)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.countApproxDistinctUserProduct(MatrixFactorizationModel.scala:96)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:126)
at com.aexp.cxp.recommendation.ProductRecommendationIndividual.main(ProductRecommendationIndividual.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:742)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Is there any configuration i need to set to load the model? any suggestion would be great help.

Answer

In Spark as in any other distributed computing framework, it is important to understand where the code runs when you are trying to debug it. It is also important to have access to various types. For example, in YARN, you would have:

  • the master logs if your record them yourself
  • the aggregated slave logs (thanks YARN, useful feature !)
  • the YARN node manager (will for example tell you why a container was killed etc)
  • etc

Digging into Spark issues can be quite time consuming if you don't look at the right place from the start. Now more specifically on this question, you have a clear stacktrace, which is not always the case, so you should use it to your advantage.

The top of the stacktrace is

Exception in thread "main" java.io.IOException: Not a file: maprfs:/op/tc/model/data at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at

As you can see, the Spark job was executing a map operation when it failed. Who executes a map ? The slaves, therefore you have to make sure your file is available on all slaves, not only on the master.

More generally, you always need to make a clear distinction in your head between the code you are writing for the master and the code you are writing for the slaves. This will help you detecting this kind of interactions, as well as references to non-serializable objects and such common mistakes.

Comments