duckertito duckertito - 19 days ago 8
Scala Question

How to use SQLContext and SparkContext inside foreachPartition

I want to use SparkContext and SQLContext inside

foreachPartition
, but unable to do it due to serialization error. I know that both objects are not serializable, but I thought that
foreachPartition
is executed on the master, where both Spark Context and SQLContext are available..

Notation:

`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`


This is my current code (UtilsDM is an object that
extends Serializable
). The part of code that fails starts from
val schema =...
, where I want to write
result
to the
DataFrame
and then save it to Parquet. Maybe the way I organized the code is inefficient, then I'd like to here your recommendations. Thanks.

// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)

// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}

// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)

result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)

// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})


EDIT:

I am using Spark 1.6.2 and Scala 2.10.6.

Answer

It is not possible. SparkContext, SQLContext and SparkSession can be used only on the driver. You can use sqlContext in the top level of foreachRDD:

 myDStream.foreachRDD(rdd => {
     val df = sqlContext.createDataFrame(rdd, schema)
     ... 
 })

You cannot use it in transformation / action:

myDStream.foreachRDD(rdd => {
     rdd.foreach { 
        val df = sqlContext.createDataFrame(...)
        ... 
     }
 })

You probably want equivalent of:

myDStream.foreachRDD(rdd => {
   val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
   val df = sqlContext.createDataFrame(foo, schema)
   df.write.parquet("s3n://bucket/pathToSentMessages)
})