Daniel Zendejas Daniel Zendejas - 1 year ago 222
Scala Question

Cannot connect to remote MongoDB from EMR cluster with spark-shell

I'm trying to connect to a remote Mongo database from a EMR cluster. The following code is executed with the command

spark-shell --packages com.stratio.datasource:spark-mongodb_2.10:0.11.2

import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.config._
import com.stratio.datasource.mongodb.config.MongodbConfig._

val builder = MongodbConfigBuilder(Map(Host -> List("[IP.OF.REMOTE.HOST]:3001"), Database -> "meteor", Collection ->"my_target_collection", ("user", "user_name"), ("database", "meteor"), ("password", "my_password")))
val readConfig = builder.build()
val mongoRDD = sqlContext.fromMongoDB(readConfig)

Spark-shell responds with the following error:

16/07/26 15:44:35 INFO SparkContext: Starting job: aggregate at MongodbSchema.scala:47
16/07/26 15:44:45 WARN DAGScheduler: Creating new stage failed due to exception - job: 1
com.mongodb.MongoTimeoutException: Timed out after 10000 ms while waiting to connect. Client view of cluster state is {type=Unknown, servers=[{address=[IP.OF.REMOTE.HOST]:3001, type=Unknown, state=Connecting, exception={java.lang.IllegalArgumentException: response too long: 1347703880}}]
at com.mongodb.BaseCluster.getDescription(BaseCluster.java:128)
at com.mongodb.DBTCPConnector.getClusterDescription(DBTCPConnector.java:394)
at com.mongodb.DBTCPConnector.getType(DBTCPConnector.java:571)
at com.mongodb.DBTCPConnector.getReplicaSetStatus(DBTCPConnector.java:362)
at com.mongodb.Mongo.getReplicaSetStatus(Mongo.java:446)

After reading for a while, a few responses here in SO and other forums state that the
java.lang.IllegalArgumentException: response too long: 1347703880
error might be caused by a faulty Mongo driver. Based on that I started executing spark-shell with updated drivers like so:

spark-shell --packages com.stratio.datasource:spark-mongodb_2.10:0.11.2 --jars casbah-commons_2.10-3.1.1.jar,casbah-core_2.10-3.1.1.jar,casbah-query_2.10-3.1.1ja.jar,mongo-java-driver-2.13.0.jar

Of course before this I downloaded the jars and stored them in the same route as the spark-shell was executed. Nonetheless, with this approach spark-shell answers with the following cryptic error message:

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: com/mongodb/casbah/query/dsl/CurrentDateOp
at com.mongodb.casbah.MongoClient.apply(MongoClient.scala:218)
at com.stratio.datasource.mongodb.partitioner.MongodbPartitioner.isShardedCollection(MongodbPartitioner.scala:78)

It is worth mentioning that the target MongoDB is a Meteor Mongo database, that's why I'm trying to connect with
instead of using the port

What might be the issue? I've followed many tutorials but all of them seem to have the MongoDB in the same host, allowing them to declare
in the credentials. Is there something I'm missing?

Thanks for the help!

Answer Source

I ended up using MongoDB's official Java driver instead. This was my first experience with Spark and the Scala programming language, so I wasn't very familiar with the idea of using plain Java JARs yet.

The solution

I downloaded the necessary JARs and stored them in the same directory as the job file, which is a Scala file. So the directory looked something like:


Then, I start spark-shell as follows to load the JARs and its classes into the shell environment:

spark-shell --jars "mongodb-driver-3.0.1.jar,mongodb-driver-core-3.0.1.jar,bson-3.0.1.jar"

Next, I execute the following to load the source code of the job into the spark-shell:

:load job.scala

Finally I execute the main object in my job like so:


As of the code inside the MainObject, it is merely as the tutorial states:

val mongo = new MongoClient(IP_OF_REMOTE_MONGO , 27017)
val db = mongo.getDB(DB_NAME)

Hopefully this will help future readers and spark-shell/Scala beginners!

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download