Rolf Lobo Rolf Lobo - 5 months ago 31
JSON Question

Error while extracting json into case class SCALA FLINK

The problem is in the map function while doing case class extraction. The case class is not serializable. I have defined formats

DefaultFormats
implicitly.

package org.apache.flink.quickstart
import java.util.Properties

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try


case class CC(key:String)

object WordCount{
def main(args: Array[String]) {

implicit val formats = org.json4s.DefaultFormats

// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment

val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
// .map(_.extract[CC])


val l = st.map(_.extract[CC])

st.print()
env.execute()
}
}


The error :


INFO [main] (TypeExtractor.java:1804) - No fields detected for class
org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be
handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not
serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:38)
at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more

Process finished with exit code 1


Answer Source

The solution to to put

implicit val formats = org.json4s.DefaultFormats

outside the main function like

object WordCount{

    implicit val formats = org.json4s.DefaultFormats
  def main(args: Array[String])

{

OR lazily initialize formats like

implicit lazy val formats = org.json4s.DefaultFormats

inside the main function like

def main(args: Array[String]) {

    implicit val formats = org.json4s.DefaultFormats