dyltini dyltini - 3 months ago 14
Java Question

Update collection in MongoDb via Apache Spark using Mongo-Hadoop connector

I would like to update a specific collection in MongoDb via Spark in Java.
I am using the MongoDB Connector for Hadoop to retrieve and save information from Apache Spark to MongoDb in Java.

After following Sampo Niskanen's excellent post regarding retrieving and saving collections to MongoDb via Spark, I got stuck with updating collections.

MongoOutputFormat.java includes a constructor taking String[] updateKeys, which I am guessing is referring to a possible list of keys to compare on existing collections and perform an update. However, using Spark's

saveAsNewApiHadoopFile()
method with parameter
MongoOutputFormat.class
, I am wondering how to use that update constructor.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);


Prior to this, MongoUpdateWritable.java was being used to perform collection updates. From examples I've seen on Hadoop, this is normally set on
mongo.job.output.value
, maybe like this in Spark:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);


However, I'm still wondering how to specify the update keys in
MongoUpdateWritable.java
.

Admittedly, as a hacky way, I've set the "_id" of the object as my document's KeyValue so that when a save is performed, the collection will overwrite the documents having the same KeyValue as
_id
.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
BSONObject o = (BSONObject) s._1;

//for all keys, set _id to key:value_
String id = "";
for (String key : o.keySet()){
id += key + ":" + (String) o.get(key) + "_";
}
o.put("_id", id);

o.put("result", s._2);
return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);


I would like to perform the mongodb collection update via Spark using
MongoOutputFormat
or
MongoUpdateWritable
or
Configuration
, ideally using the
saveAsNewAPIHadoopFile()
method. Is it possible? If not, is there any other way that does not involve specifically setting the _id to the key values I want to update on?

Answer

I tried several combination of config.set("mongo.job.output.value","....") and several combination of

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

and none of them worked.

I made it to work by using MongoUpdateWritable class as output of my map method:

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

The raw query executed in mongo is then something like this:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms