D.Asare D.Asare - 10 months ago 80
Java Question

Merge Multiple JavaRDD

I've attempted to merge multiple JavaRDD but i get only 2 merged can someone kindly help. I've been struggling with this for a while but overall i would like to be able to obtain multiple collections and use sqlContext create a group and print out all results.

here my code

JavaRDD<AppLog> logs = mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs")

public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){

Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", uri);

JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class

return documents.map(

new Function<Tuple2<Object, BSONObject>, AppLog>() {

public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2();

log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));

return log;

// printing the collections
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);

DataFrame groupedMessages = sqlContext.sql(
"select * from logs");
// "select target, action, Count(*) from logs group by target, action");

// "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");



Answer Source

If you wanted to merge multiple JavaRDDs , simply use sc.union(rdd1,rdd2,..) instead rdd1.union(rdd2).union(rdd3).

Also check this RDD.union vs SparkContex.union