Yamini Yamini - 3 months ago 21
Scala Question

Spark SQL to insert data into Cassandra

I am a beginner with Scala and Apache Spark and I am facing the below problem.

I am trying to insert data into a Cassandra table..user (name,favorite_food) using spark SQL.

The code snippet looks like this

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")

val sc = new SparkContext("local", "test", conf)
val sqlC = new CassandraSQLContext(sc)
sqlC.setKeyspace("myKeySpace")
sqlC.sql("INSERT into user (name,favorite_food) values ('John Doe','brownies')")


However I face the issue
Exception in thread "main" java.lang.RuntimeException: [1.13] failure: ``table'' expected but identifier user found

I am running a local instance of Cassandra DB

My Maven POM looks like

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>


My question is;
Please can you let me know why I face the below error (The insert statement works perfectly fine on the cassandra shell)
Exception in thread "main" java.lang.RuntimeException: [1.13] failure: ``table'' expected but identifier user found

PS : I do know I can use the spark connector provided by datastax to save data to Cassandra, however I want to use Spark SQL...is that possible?

Answer

We can't insert a data to table using Cassandra Context. Spark doesn't provide that option.

You will try it this it will definitely works,

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import com.datastax.spark.connector.UDTValue 

//we need define a class
//case class name(column name: datatype,column name: datatype)

case class model(name: String, favorite_food: String)

// define sequence to insert a data 
// val coll = sc.parallelize(Seq(classname(data, data), model(data, data)))

val collection = sc.parallelize(Seq(model("John Doe", "brownies")))

 //then save to cassandra
 //collection.saveToCassandra("keyspace_name", "table_name", SomeColumns("col name", "col name"))

collection.saveToCassandra("myKeySpace", "user", SomeColumns("name", "favorite_food"))

Thanks, Aravinth