Sandeep Purohit Sandeep Purohit - 1 month ago 4
Scala Question

i want to store each rdd into database in twitter streaming using apache spark but got error of task not serialize in scala

I write a code in which twitter streaming take a

rdd
of tweet class and store each
rdd
in database but it got error task not serialize I paste the code.

sparkstreaming.scala


case class Tweet(id: Long, source: String, content: String, retweet: Boolean, authName: String, username: String, url: String, authId: Long, language: String)

trait SparkStreaming extends Connector {

def startStream(appName: String, master: String): StreamingContext = {
val db = connector("localhost", "rmongo", "rmongo", "pass")
val dbcrud = new DBCrud(db, "table1")
val sparkConf: SparkConf = new SparkConf().setAppName(appName).setMaster(master).set(" spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .set("spark.kryo.registrator", "HelloKryoRegistrator")
// sparkConf.registerKryoClasses(Array(classOf[DBCrud]))
val sc: SparkContext = new SparkContext(sparkConf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
ssc
}
}
object SparkStreaming extends SparkStreaming


I use this streaming context in plat controller to store tweets in database but it throws exception. I'm using mongodb to store it.

def streamstart = Action {
val stream = SparkStreaming
val a = stream.startStream("ss", "local[2]")
val db = connector("localhost", "rmongo", "rmongo", "pass")
val dbcrud = DBCrud
val twitterauth = new TwitterClient().tweetCredantials()
val tweetDstream = TwitterUtils.createStream(a, Option(twitterauth.getAuthorization))
val tweets = tweetDstream.filter { x => x.getUser.getLang == "en" }.map { x => Tweet(x.getId, x.getSource, x.getText, x.isRetweet(), x.getUser.getName, x.getUser.getScreenName, x.getUser.getURL, x.getUser.getId, x.getUser.getLang) }
// tweets.foreachRDD { x => x.foreach { x => dbcrud.insert(x) } }
tweets.saveAsTextFiles("/home/knoldus/sentiment project/spark services/tweets/tweets")
// val s=new BirdTweet()
// s.hastag(a.sparkContext)
a.start()
Ok("start streaming")
}


When make a single of streaming which take tweets and use
forEachRDD
to store each tweet then it works but if I use it from outside it doesn't work.

Please help me.

Answer

Try to create connection with MongoDB inside foreachRDD block, as mentioned in Spark Documentation

tweets.foreachRDD { x => 
 x.foreach { x => 
  val db = connector("localhost", "rmongo", "rmongo", "pass")
  val dbcrud = new DBCrud(db, "table1")
  dbcrud.insert(x) 
 } 
}