ammills01 ammills01 - 1 year ago 267
Java Question

Spark Streaming - Java - Insert JSON from Kafka into Cassandra

I'm writing a simple data pipeline in Spark Streaming, using Java, to pull JSON data from Kafka, parse the JSON into a custom class (

), then insert that data into a Cassandra table but I am unable to get the
function to work.

I've seen tons of examples that say all you have to do is something along the lines of this:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(

JavaDStream<String> lines =
new Function<Tuple2<String,String>, String>(){
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();

However, when I do this I get the error:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions

I think all I am lacking is some sort of decoration on my class but I have not been successful in figuring out which one. I've tried going bare bones, essentially making the class a property bag:

public class Transaction implements{

public int TransactionId;

public Transaction(){}

I've tried all of the DataStax mapping annotations:

@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements{

public int TransactionId;

public Transaction(){}

I also tried establishing public get/set methods for each property and setting the properties to private:

public class Transaction implements{

private int transactionId;

public Transaction(){}

public int getTransactionId() {
return transactionId;

public void setTransactionId(int transactionId) {
this.transactionId = transactionId;

I have been able to parse the
into an
using the class below:

public class Transaction implements{


public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line =;
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);

return transactions;

In conjunction with the following code, acting on the
object from above:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());

However, once I have it in this from it still doesn't work with the writeBuilder().saveToCassandra() chain.

Any help here is greatly appreciated.

Answer Source

Turns out the issue was just an import issue. I had imported com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* thinking it would give me everything I needed however I also needed to bring in com.datastax.spark.connector.japi.CassandraJavaUtil.* for the .mapToRow() function.

Once I resolved this, I began getting the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(
    at globalTransactions.Process.main(
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 9 more

Which was resolved by pulling in the spark-sql project:


Hope this helps the next guy/gal.