Suresh Suresh - 1 year ago 174
Scala Question

Unable to serialize SparkContext in foreachRDD

I am trying to save the streaming data to cassandra from Kafka. I am able to read and parse the data but when I call below lines to save the data i am getting a

Task not Serializable
Exception. My class is extending serializable but not sure why i am seeing this error, didn't get much help ever after googling for 3 hours, can some body give any pointers ?

val collection = sc.parallelize(Seq((,
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))`

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime

object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("", "")

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(2))

val sqlContext = new SQLContext(sc)

val kafkaParams = Map("" -> "localhost:9092")

val topics = args.toSet

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

stream.foreachRDD { rdd =>

if (!rdd.isEmpty()) {
try {

rdd.foreachPartition { iter =>
iter.foreach {
case (key, msg) =>

val obj = msgParseMaster(msg)

val collection = sc.parallelize(Seq((,
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))






import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m



I am getting

org.apache.spark.SparkException: Task not serializable

below is the full log

16/08/06 10:24:52 ERROR JobScheduler: Error running job streaming job 1470504292000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)

Answer Source

SparkContext isn't serializable, you can't use it inside foreachRDD, and from the use of your graph you don't need it. Instead, you can simply map over each RDD, parse out the relevant data and save that new RDD to cassandra:

  .map { 
    case (_, msg) => 
      val result = msgParseMaster(msg)
  .foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data")))