Lobsterrrr Lobsterrrr - 1 month ago 32
Scala Question

How to convert RDD to DataFrame in Spark Streaming, not just Spark

How can I convert

RDD
to
DataFrame
in
Spark Streaming
, not just
Spark
?

I saw this example, but it requires
SparkContext
.

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()


In my case I have
StreamingContext
. Should I then create
SparkContext
inside
foreach
? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the
DataFrame
in Amazon S3 using
rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
, which is not possible for
RDD
without converting it to
DataFrame
(as I know).

myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}

Answer

Create sqlContext outside foreachRDD ,Once you convert the rdd to DF using sqlContext, you can write into S3.

For example:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

Update:

Even you can create sqlContext inside foreachRDD which is going to execute on Driver.