Sandeep Purohit Sandeep Purohit - 1 year ago 52
Scala Question

i want to store each rdd into database in twitter streaming using apache spark but got error of task not serialize in scala

I write a code in which twitter streaming take a

of tweet class and store each
in database but it got error task not serialize I paste the code.


case class Tweet(id: Long, source: String, content: String, retweet: Boolean, authName: String, username: String, url: String, authId: Long, language: String)

trait SparkStreaming extends Connector {

def startStream(appName: String, master: String): StreamingContext = {
val db = connector("localhost", "rmongo", "rmongo", "pass")
val dbcrud = new DBCrud(db, "table1")
val sparkConf: SparkConf = new SparkConf().setAppName(appName).setMaster(master).set(" spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .set("spark.kryo.registrator", "HelloKryoRegistrator")
// sparkConf.registerKryoClasses(Array(classOf[DBCrud]))
val sc: SparkContext = new SparkContext(sparkConf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
object SparkStreaming extends SparkStreaming

I use this streaming context in plat controller to store tweets in database but it throws exception. I'm using mongodb to store it.

def streamstart = Action {
val stream = SparkStreaming
val a = stream.startStream("ss", "local[2]")
val db = connector("localhost", "rmongo", "rmongo", "pass")
val dbcrud = DBCrud
val twitterauth = new TwitterClient().tweetCredantials()
val tweetDstream = TwitterUtils.createStream(a, Option(twitterauth.getAuthorization))
val tweets = tweetDstream.filter { x => x.getUser.getLang == "en" }.map { x => Tweet(x.getId, x.getSource, x.getText, x.isRetweet(), x.getUser.getName, x.getUser.getScreenName, x.getUser.getURL, x.getUser.getId, x.getUser.getLang) }
// tweets.foreachRDD { x => x.foreach { x => dbcrud.insert(x) } }
tweets.saveAsTextFiles("/home/knoldus/sentiment project/spark services/tweets/tweets")
// val s=new BirdTweet()
// s.hastag(a.sparkContext)
Ok("start streaming")

When make a single of streaming which take tweets and use
to store each tweet then it works but if I use it from outside it doesn't work.

Please help me.

Answer Source

Try to create connection with MongoDB inside foreachRDD block, as mentioned in Spark Documentation

tweets.foreachRDD { x => 
 x.foreach { x => 
  val db = connector("localhost", "rmongo", "rmongo", "pass")
  val dbcrud = new DBCrud(db, "table1")
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download