siva siva - 1 month ago 25
Scala Question

Datastax spark cassadra connector - writing DF to cassandra table

We have recently started big-data project using Scala, Spark and Cassandra and I am new to all of these technologies. I am trying to do simple task write to and read from cassandra table. I am able to achieve this if I keep property names and column names all either in lowercase or snake case (unserscores), but I want to use camel case in my scala code. Is there any better way to achieve this using camel case format in Scala and snake case in cassandra.

we are using


scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector -
1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3


Cassandra table

CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)


Scala code

val conf = new SparkConf()
.setAppName("TestHelper")
.setMaster("local")
.set("spark.cassandra.connection.host","127.0.01")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

println("writing data to cassandra")
val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
df.write //*** this is not working
.cassandraFormat("castable", "dev")
.mode(SaveMode.Append)
.save()

println("reading data from cassandra") //*** This is working fine
val rdd = sc.cassandraTable[MyRow]("dev", "castable")
rdd.foreach(println)


Exception

Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)


I read that spark-cassandra-connector automatically does this conversion automatically but it is not working for me.
datastax spark-cassandra-connector

Answer

Using RDDs, spark-cassandra-connector automatically converts camel cased properties to underscored column names. Thanks again RussS

Here is how I am saving case class objects to cassandra table

    val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
    writeRDD.saveToCassandra("dev", "castable")