Dhivya Narayanasamy Dhivya Narayanasamy - 3 years ago 176
Scala Question

Querying Cassandra data using Spark SQL in Scala

I am trying to Query Cassandra data using Spark SQL in Scala.

import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

val conf= new SparkConf(true)
.set("spark.cassandra.connection.host","**.*.**.***")
.set("spark.cassandra.auth.username","****")
.set("spark.cassandra.auth.password","****")
val sc = new SparkContext(conf)

import org.apache.spark.sql._
val sqlContext = new SQLContext(sc)
sqlContext.sql("SELECT * FROM energydata.demodata")


And it throws error:


org.apache.spark.sql.AnalysisException: Table or view not found:
energydata
.
d emodata
; line 1 pos 14; 'Project [*]
+- 'UnresolvedRelation
energydata
.
demodata


at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis
(package.scala:42) at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis
$1.apply(CheckAnalysis.scala:82) at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis
$1.apply(CheckAnalysis.scala:78) at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(Tre
eNode.scala:126) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(Tre
eNode.scala:126) at
scala.collection.immutable.List.foreach(List.scala:381) at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Ch
eckAnalysis.scala:78) at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scal
a:91) at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution
.scala:52) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) at
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691) ... 54
elided


I just want to read table data without disturbing the cassandra Table. I tried this solution given here to add
hive-site.xml
file to
spark/conf
. But when i add this to
spark/conf
, it seems that spark is not working properly.

at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.sc
ala:938)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
at $line3.$read$$iw$$iw.<init>(<console>:15)
at $line3.$read$$iw.<init>(<console>:42)
at $line3.$read.<init>(<console>:44)
at $line3.$read$.<init>(<console>:48)
at $line3.$read$.<clinit>(<console>)
at $line3.$eval$.$print$lzycompute(<console>:7)
at $line3.$eval$.$print(<console>:6)
at $line3.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)

at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047
)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaCla
ssLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(Abstrac
tFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.
scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:8
07)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV
$sp(SparkILoop.scala:38)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)

at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILo
op.scala:920)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(Sca
laClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:70)
at org.apache.spark.repl.Main$.main(Main.scala:53)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
mit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
0)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1523)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(Retry
ingMetaStoreClient.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:132)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:104)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja
va:3005)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:123
4)
... 87 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou
rce)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1521)
... 93 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
at org.apache.hadoop.fs.Path.initialize(Path.java:205)
at org.apache.hadoop.fs.Path.<init>(Path.java:196)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
141)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
146)
at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:1
59)
at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(War
ehouse.java:177)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB_core(HiveMetaStore.java:600)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB(HiveMetaStore.java:620)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMe
taStore.java:461)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHM
SHandler.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(Retrying
HMSHandler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(
HiveMetaStore.java:5762)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaS
toreClient.java:199)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(
SessionHiveMetaStoreClient.java:74)
... 98 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
at java.net.URI.checkPath(Unknown Source)
at java.net.URI.<init>(Unknown Source)
at org.apache.hadoop.fs.Path.initialize(Path.java:202)
... 111 more
17/07/26 11:40:06 WARN ObjectStore: Failed to get database default, returning No
SuchObjectException
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.
sql.hive.HiveSessionStateBuilder':
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1053)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scal
a:129)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:93
8)
at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
... 47 elided
Caused by: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: j
ava.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metad
ata.SessionHiveMetaStoreClient;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCa
talog.scala:193)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(Shared
State.scala:105)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala
:93)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessi
onStateBuilder.scala:39)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSe
ssionStateBuilder.scala:54)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:52)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:35)
at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStat
eBuilder.scala:289)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1050)
... 61 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Unable to ins
tantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala
:191)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(Isolated
ClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:3
62)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:2
66)
at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExterna
lCatalog.scala:66)
at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.sc
ala:65)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly$mcZ$sp(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:97)
... 70 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1523)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMet
aStoreClient.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:132)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:104)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:300
5)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

... 84 more
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumen
tException: java.net.URISyntaxException: Relative path in absolute URI: file:$%7
Btest.warehouse.dir%7D
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1521)
... 90 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
at org.apache.hadoop.fs.Path.initialize(Path.java:205)
at org.apache.hadoop.fs.Path.<init>(Path.java:196)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:141)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:146)
at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)
at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse
.java:177)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_c
ore(HiveMetaStore.java:600)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(H
iveMetaStore.java:620)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStor
e.java:461)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandl
er.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHan
dler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMe
taStore.java:5762)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreCl
ient.java:199)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(Sessio
nHiveMetaStoreClient.java:74)
... 95 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
at java.net.URI.checkPath(Unknown Source)
at java.net.URI.<init>(Unknown Source)
at org.apache.hadoop.fs.Path.initialize(Path.java:202)
... 108 more
<console>:14: error: not found: value spark
import spark.implicits._
^
<console>:14: error: not found: value spark
import spark.sql
^
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala>


I am using scala 2.12.2, Java 1.8.0, cassandra 3.1.1 versions. Is there any other way i can write SQL query in scala?

Thank You.

Answer Source

From the imports I understand that you're using spark-cassandra-connector. In the version compatibility section they've mentioned that the connector supports Scala 2.10, 2.11 and Cassandra 2.1.5*, 2.2, 3.0 with Spark 2.0, 2.1 with the latest version of connector.

So I'll suggest you to downgrade the scala and cassandra versions and check if it works.

Next, I'll suggest you to change the way youre trying to access the tables. Datastax have provided you with a different API to connect to Cassandra. You may find the relevant documentation here.

You may do something like this with Spark 2.x,

val spark = SparkSession.builder()
.appName("CassTest")
.master("local[2]")
.config("spark.cassandra.connection.host","**.*.**.***")
.config("spark.cassandra.auth.username","****")
.config("spark.cassandra.auth.password","****") 
.getOrCreate()

import spark.implicits._

val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "words", "keyspace" -> "test" ))
  .load()

Finally you may do df.show
NOTE: The hive-site.xml fix you tried is to connect Hive with some globally accessible metastore, which itself is a different data store. So, that will not work for Cassandra.

Let me know if this helped. Cheers.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download