Hunter Lin Hunter Lin - 4 months ago 59
Scala Question

Spark cannot compile newAPIHadoopRDD with mongo-hadoop-connector's BSONFileInputFormat

I'm using mongo-hadoop client(r1.5.2) in spark to read data from mongoDB and bson following this link: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage. So far I can read from mongoDB without problem. However, the bson configuration cannot even compile. Please help.

my code in scala:

dataConfig.set("mapred.input.dir", "path.bson")

val documents = sc.newAPIHadoopRDD(
dataConfig,
classOf[BSONFileInputFormat],
classOf[Object],
classOf[BSONObject])


Error:

Error:(56, 24) inferred type arguments [Object,org.bson.BSONObject,com.mongodb.hadoop.mapred.BSONFileInputFormat] do not conform to method newAPIHadoopRDD's type parameter bounds [K,V,F <: org.apache.hadoop.mapreduce.InputFormat[K,V]]
val documents = sc.newAPIHadoopRDD(
^

Answer

I found the solution to it! The problem seems to be caused by generics of InputFormat

newAPIHadoopRDD is requiring input format as

F <: org.apache.hadoop.mapreduce.InputFormat[K,V]

Although BSONFileInputFormat extends FileInputFormat[K,V] which extends InputFormat[K,V], it didn't specify K,V generics as Object and BSONObject. (Actually K,V generics are not mentioned in BSONFileInputFormat, can the class really compile?).

Anyway, the solution is to cast BSONFileInputFormat as a subclass of InputFormat with K and V defined:

val documents = sc.newAPIHadoopRDD(
  dataConfig,                
  classOf[BSONFileInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, BSONObject]]),  
  classOf[Object],            
  classOf[BSONObject]) 

Now it works without any problem :)