srinu srinu -4 years ago 196
Scala Question

How to insert rows into cassandra if they don't exist using spark- cassandra driver?

I want to write to cassandra from a data frame and I want to exclude the rows if a particular row is already existing (i.e Primary key- though upserts happen I don't want to change the other columns) using spark-cassandra connector. Is there a way we can do that?

Thanks.!

Answer Source

You can use the ifNotExists WriteConf option which was introduced in this pr.

It works like so:

val writeConf = WriteConf(ifNotExists = true)
rdd.saveToCassandra(keyspaceName, tableName, writeConf = writeConf)

If you are using a DataFrame, you will have to convert it into an RDD, and then map over it to convert the RDD[Row] into a Tuple or Array[Any] since the spark cassandra connector does not yet have an implicit for writing a Row to cassandra.

A simple example of this is:

val writeConf = WriteConf(ifNotExists = true)
dataFrame.rdd
  .map(row => (row.get(0), row.get(1), ...))
  .saveToCassandra(keyspaceName, tableName, writeConf = writeConf)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download