wzz wzz - 3 months ago 21
Scala Question

Operating RDD failed while setting Spark record delimiter with org.apache.hadoop.conf.Configuration

I want to process a large text file "mydata.txt" (the size of actual file is about 30GB) with Spark. It's record delimiter is "\ |" followed by "\n". Because the default record separator of loading file (by "sc.textFile") is "\n", I set "textinputformat.record.delimiter" property of org.apache.hadoop.conf.Configuration to "\ |\n" to specify the record delimiter:

AAAAA_|BBBBB_|
CCCCC\
DDDDD
EEEEE_FFFFFFFFFFFF\ |
GGGGG_|HHHHH_|
IIIII\
GGGGG\
KKKKK_|LLLLLLLLLLL\ |
MMMM_|NNNNN_|OOOOO\ |


Next I executed the following code in spark-shell´╝Ü

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val LINE_DELIMITER = "\\ |\n"
val FIELD_SEP = "_\\|"

val conf = new Configuration
conf.set("textinputformat.record.delimiter", LINE_DELIMITER)
val raw_data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)


so far so good. however,

scala> val data = raw_data.filter(x => x.split(FIELD_SEP).size >= 3)
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:22

scala> data.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

scala> data.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
...


Why I can't manipulate RDD "data", while everything is fine when using
sc.textFile("mydata.txt")
?
And how to fix it?

Answer

You are getting this exception because you are closing over org.apache.hadoop.conf.Configuration but it is not serializable

Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

You can do two things: 1. Register Configuration with a Kyro serializer OR 2. Just mark your conf variable as transient which basically tells Spark not to ship it with the closure.

scala> @transient val conf = new Configuration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml

scala> val raw_data = sc.newAPIHadoopFile("../test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
14/11/28 00:54:03 INFO MemoryStore: ensureFreeSpace(32937) called with curMem=70594, maxMem=278302556
14/11/28 00:54:03 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KB, free 265.3 MB)
raw_data: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at map at <console>:18

scala> val data = raw_data.filter{x => x.split(FIELD_SEP).size >= 3}
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[6] at filter at <console>:22

scala> data.count
14/11/28 00:54:16 INFO FileInputFormat: Total input paths to process : 1
14/11/28 00:54:16 INFO SparkContext: Starting job: count at <console>:25
14/11/28 00:54:16 INFO DAGScheduler: Got job 2 (count at <console>:25) with 1 output partitions (allowLocal=false)
14/11/28 00:54:16 INFO DAGScheduler: Final stage: Stage 2(count at <console>:25)
14/11/28 00:54:16 INFO DAGScheduler: Parents of final stage: List()
14/11/28 00:54:16 INFO DAGScheduler: Missing parents: List()
14/11/28 00:54:16 INFO DAGScheduler: Submitting Stage 2 (FilteredRDD[6] at filter at <console>:22), which has no missing parents
14/11/28 00:54:16 INFO MemoryStore: ensureFreeSpace(4488) called with curMem=103531, maxMem=278302556
14/11/28 00:54:16 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.4 KB, free 265.3 MB)
14/11/28 00:54:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (FilteredRDD[6] at filter at <console>:22)
14/11/28 00:54:16 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/11/28 00:54:16 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1223 bytes)
14/11/28 00:54:16 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
14/11/28 00:54:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123
14/11/28 00:54:16 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1731 bytes result sent to driver
14/11/28 00:54:16 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 19 ms on localhost (1/1)
14/11/28 00:54:16 INFO DAGScheduler: Stage 2 (count at <console>:25) finished in 0.019 s
14/11/28 00:54:16 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/11/28 00:54:16 INFO DAGScheduler: Job 2 finished: count at <console>:25, took 0.025300 s
res5: Long = 1

scala> data.collect
14/11/28 00:55:16 INFO SparkContext: Starting job: collect at <console>:25
14/11/28 00:55:16 INFO DAGScheduler: Got job 3 (collect at <console>:25) with 1 output partitions (allowLocal=false)
14/11/28 00:55:16 INFO DAGScheduler: Final stage: Stage 3(collect at <console>:25)
14/11/28 00:55:16 INFO DAGScheduler: Parents of final stage: List()
14/11/28 00:55:16 INFO DAGScheduler: Missing parents: List()
14/11/28 00:55:16 INFO DAGScheduler: Submitting Stage 3 (FilteredRDD[6] at filter at <console>:22), which has no missing parents
14/11/28 00:55:16 INFO MemoryStore: ensureFreeSpace(4504) called with curMem=108019, maxMem=278302556
14/11/28 00:55:16 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 4.4 KB, free 265.3 MB)
14/11/28 00:55:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (FilteredRDD[6] at filter at <console>:22)
14/11/28 00:55:16 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
14/11/28 00:55:16 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1223 bytes)
14/11/28 00:55:16 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
14/11/28 00:55:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123
14/11/28 00:55:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1717 bytes result sent to driver
14/11/28 00:55:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 16 ms on localhost (1/1)
14/11/28 00:55:16 INFO DAGScheduler: Stage 3 (collect at <console>:25) finished in 0.017 s
14/11/28 00:55:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/11/28 00:55:16 INFO DAGScheduler: Job 3 finished: collect at <console>:25, took 0.021439 s
res6: Array[String] = Array(MMMM_|NNNNN_|OOOOO\ |)
Comments