Antonín Kučera Antonín Kučera - 1 month ago 25
Scala Question

Spark Connector MongoDB - Python API

I'd like pull data from Mongo by Spark, especially by PySpark..
I have found official guide from Mongo https://docs.mongodb.com/spark-connector/python-api/

I have all prerequisites:


  • Scala 2.11.8

  • Spark 1.6.2

  • MongoDB 3.0.8 (not on same device where is Spark)

    $ pyspark --conf
    "spark.mongodb.input.uri=mongodb://mongo1:27019/xxx.xxx?readPreference=primaryPreferred"
    --packages org.mongodb.spark:mongo-spark-connector_2.11:1.1.0



and PySpark showed me this:

Python 3.4.2 (default, Oct 8 2014, 10:45:20)
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.mongodb.spark#mongo-spark-connector_2.11;1.1.0 in central
found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 280ms :: artifacts dl 6ms
:: modules in use:
org.mongodb#mongo-java-driver;3.2.2 from central in [default]
org.mongodb.spark#mongo-spark-connector_2.11;1.1.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/9ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/12 16:35:46 INFO SparkContext: Running Spark version 1.6.2
16/10/12 16:35:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/12 16:35:47 INFO SecurityManager: Changing view acls to: root
16/10/12 16:35:47 INFO SecurityManager: Changing modify acls to: root
16/10/12 16:35:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/10/12 16:35:47 INFO Utils: Successfully started service 'sparkDriver' on port 35485.
16/10/12 16:35:48 INFO Slf4jLogger: Slf4jLogger started
16/10/12 16:35:48 INFO Remoting: Starting remoting
16/10/12 16:35:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.28.194:39860]
16/10/12 16:35:48 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 39860.
16/10/12 16:35:48 INFO SparkEnv: Registering MapOutputTracker
16/10/12 16:35:48 INFO SparkEnv: Registering BlockManagerMaster
16/10/12 16:35:48 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1e9185bd-fd1a-4d36-8c7e-9b6430e9f5c6
16/10/12 16:35:48 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/10/12 16:35:48 INFO SparkEnv: Registering OutputCommitCoordinator
16/10/12 16:35:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/10/12 16:35:48 INFO SparkUI: Started SparkUI at http://192.168.28.194:4040
16/10/12 16:35:48 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/httpd-d62ed1b8-e4ab-4891-9b61-5f0f5ae7eb6e
16/10/12 16:35:48 INFO HttpServer: Starting HTTP Server
16/10/12 16:35:48 INFO Utils: Successfully started service 'HTTP file server' on port 34716.
16/10/12 16:35:48 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar at http://192.168.28.194:34716/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar with timestamp 1476282948892
16/10/12 16:35:48 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar at http://192.168.28.194:34716/jars/org.mongodb_mongo-java-driver-3.2.2.jar with timestamp 1476282948898
16/10/12 16:35:49 INFO Utils: Copying /root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar to /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/userFiles-549541b8-aaba-4444-b2eb-438baa7e82e8/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar
16/10/12 16:35:49 INFO SparkContext: Added file file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar at file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar with timestamp 1476282949018
16/10/12 16:35:49 INFO Utils: Copying /root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar to /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/userFiles-549541b8-aaba-4444-b2eb-438baa7e82e8/org.mongodb_mongo-java-driver-3.2.2.jar
16/10/12 16:35:49 INFO SparkContext: Added file file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar at file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar with timestamp 1476282949029
16/10/12 16:35:49 INFO Executor: Starting executor ID driver on host localhost
16/10/12 16:35:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43448.
16/10/12 16:35:49 INFO NettyBlockTransferService: Server created on 43448
16/10/12 16:35:49 INFO BlockManagerMaster: Trying to register BlockManager
16/10/12 16:35:49 INFO BlockManagerMasterEndpoint: Registering block manager localhost:43448 with 511.1 MB RAM, BlockManagerId(driver, localhost, 43448)
16/10/12 16:35:49 INFO BlockManagerMaster: Registered BlockManager
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.2
/_/

Using Python version 3.4.2 (default, Oct 8 2014 10:45:20)
SparkContext available as sc, HiveContext available as sqlContext.


then i put this code:

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()


and there was this:

16/10/12 16:40:51 INFO HiveContext: Initializing execution hive, version 1.2.1
16/10/12 16:40:51 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
16/10/12 16:40:51 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
16/10/12 16:40:51 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/10/12 16:40:51 INFO ObjectStore: ObjectStore, initialize called
16/10/12 16:40:51 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/10/12 16:40:51 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/10/12 16:40:51 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:51 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:53 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/10/12 16:40:53 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:53 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/10/12 16:40:54 INFO ObjectStore: Initialized ObjectStore
16/10/12 16:40:55 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/10/12 16:40:55 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/10/12 16:40:55 INFO HiveMetaStore: Added admin role in metastore
16/10/12 16:40:55 INFO HiveMetaStore: Added public role in metastore
16/10/12 16:40:55 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/10/12 16:40:55 INFO HiveMetaStore: 0: get_all_databases
16/10/12 16:40:55 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
16/10/12 16:40:55 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/10/12 16:40:55 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
16/10/12 16:40:55 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:55 INFO SessionState: Created local directory: /tmp/8733297b-e0d2-49cf-8557-62c8c4e7cc4a_resources
16/10/12 16:40:55 INFO SessionState: Created HDFS directory: /tmp/hive/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a
16/10/12 16:40:55 INFO SessionState: Created local directory: /tmp/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a
16/10/12 16:40:55 INFO SessionState: Created HDFS directory: /tmp/hive/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a/_tmp_space.db
16/10/12 16:40:55 INFO HiveContext: default warehouse location is /user/hive/warehouse
16/10/12 16:40:55 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
16/10/12 16:40:55 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
16/10/12 16:40:55 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
16/10/12 16:40:56 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/10/12 16:40:56 INFO ObjectStore: ObjectStore, initialize called
16/10/12 16:40:56 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/10/12 16:40:56 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/10/12 16:40:56 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:56 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:57 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/10/12 16:40:58 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:58 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
16/10/12 16:40:59 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/10/12 16:40:59 INFO ObjectStore: Initialized ObjectStore
16/10/12 16:40:59 INFO HiveMetaStore: Added admin role in metastore
16/10/12 16:40:59 INFO HiveMetaStore: Added public role in metastore
16/10/12 16:40:59 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/10/12 16:40:59 INFO HiveMetaStore: 0: get_all_databases
16/10/12 16:40:59 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
16/10/12 16:40:59 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/10/12 16:40:59 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO SessionState: Created local directory: /tmp/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae_resources
16/10/12 16:40:59 INFO SessionState: Created HDFS directory: /tmp/hive/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae
16/10/12 16:40:59 INFO SessionState: Created local directory: /tmp/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae
16/10/12 16:40:59 INFO SessionState: Created HDFS directory: /tmp/hive/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae/_tmp_space.db
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 139, in load
return self._df(self._jreader.load())
File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o24.load.
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.mongodb.spark.config.MongoCompanionConfig$class.getOptionsFromConf(MongoCompanionConfig.scala:209)
at com.mongodb.spark.config.ReadConfig$.getOptionsFromConf(ReadConfig.scala:39)
at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:101)
at com.mongodb.spark.config.ReadConfig$.apply(ReadConfig.scala:39)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:67)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:36)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)


I have tried a lot of possible option to pull data from Mongo by Spark..Any tips?

Answer

It looks like an error I'd expect to see if I were using code compiled in a different version of Scala. Have you tried running it with --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0?

By default in Spark 1.6.x is compiled against Scala 2.10 and you have to manually build it for Scala 2.11 like so:

./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package