salvob salvob - 5 months ago 302
JSON Question

How to parse Json formatted Kafka message in spark streaming

I have JSON messages on Kafka like this:


{"id_post":"p1", "message":"blablabla"}



and I want to parse the message, and print (or use for further computation) the
message
element.
With the following code I print the json

val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}


but I can't manage to get the single element.
I tried a few JSON parser, but no luck.
Any idea?


update:
a few errors with different JSON parser
this is the code and output with circe parser:


val parsed_record = parse(record)


and the output:

14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)


and so on.. at the row in which I use
parse(record)

looks like it can't access and/or parse the string
record
.

Same if I use lift-json
at the
parse(record)
the error output is more or less the same:

16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)


at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

Answer

I solved the issue, so I am writing here for future references:

dependencies, dependencies, dependecies!

I choose to use lift-json, but this applies to any JSON parser and/or framework.

The SPARK version I am using (v1.4.1) is the one compatible with scala 2.10, here the dependencies from pom.xml:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.1</version>
    <scope>provided</scope>
</dependency>

and some other libraries. I was using the lift-json version for scala 2.11 ... and that is WRONG.

So, for the future me and if you are reading this topic: be coherent with scala version and among dependencies. In lift-json case:

        <dependency>
            <groupId>net.liftweb</groupId>
            <artifactId>lift-json_2.10</artifactId>
            <version>3.0-M1</version>
        </dependency>