ammills01 ammills01 - 5 months ago 144
Java Question

Spark Streaming - Java - Insert JSON from Kafka into Cassandra

I'm writing a simple data pipeline in Spark Streaming, using Java, to pull JSON data from Kafka, parse the JSON into a custom class (

Transaction
), then insert that data into a Cassandra table but I am unable to get the
mapToRow()
function to work.

I've seen tons of examples that say all you have to do is something along the lines of this:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

JavaDStream<String> lines = stream.map(
new Function<Tuple2<String,String>, String>(){
@Override
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();
}
}
);

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();


However, when I do this I get the error:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions


I think all I am lacking is some sort of decoration on my class but I have not been successful in figuring out which one. I've tried going bare bones, essentially making the class a property bag:

public class Transaction implements java.io.Serializable{

public int TransactionId;
...

public Transaction(){}
}


I've tried all of the DataStax mapping annotations:

@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{

@PartitionKey(0)
@Column(name="transaction_id")
public int TransactionId;
...

public Transaction(){}
}


I also tried establishing public get/set methods for each property and setting the properties to private:

public class Transaction implements java.io.Serializable{

private int transactionId;
...

public Transaction(){}

public int getTransactionId() {
return transactionId;
}

public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
}


I have been able to parse the
DStream
into an
RDD
of
Transactions
using the class below:

public class Transaction implements java.io.Serializable{

...

public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);
}
}

return transactions;
}
}
}


In conjunction with the following code, acting on the
lines
object from above:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());


However, once I have it in this from it still doesn't work with the writeBuilder().saveToCassandra() chain.

Any help here is greatly appreciated.

Answer

Turns out the issue was just an import issue. I had imported com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* thinking it would give me everything I needed however I also needed to bring in com.datastax.spark.connector.japi.CassandraJavaUtil.* for the .mapToRow() function.

Once I resolved this, I began getting the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at globalTransactions.Process.main(Process.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 9 more

Which was resolved by pulling in the spark-sql project:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

Hope this helps the next guy/gal.