Luke Luke - 29 days ago 16
Scala Question

Spark submit hangs after writing from Cassandra to json

I have a driver program where I write read data in from Cassandra using spark, perform some operations, and then write out to JSON on S3. The program runs fine when I use Spark 1.6.1 and the spark-cassandra-connector 1.6.0-M1.

However, if I try to upgrade to Spark 2.0.1 (hadoop 2.7.1) and spark-cassandra-connector 2.0.0-M3, the program completes in the sense that all the expected files are written to S3, but the program never terminates.

I do run

sc.stop()
at the end of the program. I am also using Mesos 1.0.1. In both cases I use the default output committer.

Edit: Looking at the thread dump below, it seems like it could be waiting on:
org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner


Code snippet:

// get MongoDB oplog operations
val operations = sc.cassandraTable[JsonOperation](keyspace, namespace)
.where("ts >= ? AND ts < ?", minTimestamp, maxTimestamp)

// replay oplog operations into documents
val documents = operations
.spanBy(op => op.id)
.map { case (id: String, ops: Iterable[T]) => (id, apply(ops)) }
.filter { case (id, result) => result.isInstanceOf[Document] }
.map { case (id, document) => MergedDocument(id = id, document = document
.asInstanceOf[Document])
}

// write documents to json on s3
documents
.map(document => document.toJson)
.coalesce(partitions)
.saveAsTextFile(path, classOf[GzipCodec])
sc.stop()


Thread dump on the driver:

60 context-cleaner-periodic-gc TIMED_WAITING
46 dag-scheduler-event-loop WAITING
4389 DestroyJavaVM RUNNABLE
12 dispatcher-event-loop-0 WAITING
13 dispatcher-event-loop-1 WAITING
14 dispatcher-event-loop-2 WAITING
15 dispatcher-event-loop-3 WAITING
47 driver-revive-thread TIMED_WAITING
3 Finalizer WAITING
82 ForkJoinPool-1-worker-17 WAITING
43 heartbeat-receiver-event-loop-thread TIMED_WAITING
93 java-sdk-http-connection-reaper TIMED_WAITING
4387 java-sdk-progress-listener-callback-thread WAITING
25 map-output-dispatcher-0 WAITING
26 map-output-dispatcher-1 WAITING
27 map-output-dispatcher-2 WAITING
28 map-output-dispatcher-3 WAITING
29 map-output-dispatcher-4 WAITING
30 map-output-dispatcher-5 WAITING
31 map-output-dispatcher-6 WAITING
32 map-output-dispatcher-7 WAITING
48 MesosCoarseGrainedSchedulerBackend-mesos-driver RUNNABLE
44 netty-rpc-env-timeout TIMED_WAITING
92 org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner WAITING
62 pool-19-thread-1 TIMED_WAITING
2 Reference Handler WAITING
61 Scheduler-1112394071 TIMED_WAITING
20 shuffle-server-0 RUNNABLE
55 shuffle-server-0 RUNNABLE
21 shuffle-server-1 RUNNABLE
56 shuffle-server-1 RUNNABLE
22 shuffle-server-2 RUNNABLE
57 shuffle-server-2 RUNNABLE
23 shuffle-server-3 RUNNABLE
58 shuffle-server-3 RUNNABLE
4 Signal Dispatcher RUNNABLE
59 Spark Context Cleaner TIMED_WAITING
9 SparkListenerBus WAITING
35 SparkUI-35-selector-ServerConnectorManager@651d3734/0 RUNNABLE
36 SparkUI-36-acceptor-0@467924cb-ServerConnector@3b5eaf92{HTTP/1.1}{0.0.0.0:4040} RUNNABLE
37 SparkUI-37-selector-ServerConnectorManager@651d3734/1 RUNNABLE
38 SparkUI-38 TIMED_WAITING
39 SparkUI-39 TIMED_WAITING
40 SparkUI-40 TIMED_WAITING
41 SparkUI-41 RUNNABLE
42 SparkUI-42 TIMED_WAITING
438 task-result-getter-0 WAITING
450 task-result-getter-1 WAITING
489 task-result-getter-2 WAITING
492 task-result-getter-3 WAITING
75 threadDeathWatcher-2-1 TIMED_WAITING
45 Timer-0 WAITING


Thread dump on the executors. It's the same on all of them:

24 dispatcher-event-loop-0 WAITING
25 dispatcher-event-loop-1 WAITING
26 dispatcher-event-loop-2 RUNNABLE
27 dispatcher-event-loop-3 WAITING
39 driver-heartbeater TIMED_WAITING
3 Finalizer WAITING
58 java-sdk-http-connection-reaper TIMED_WAITING
75 java-sdk-progress-listener-callback-thread WAITING
1 main TIMED_WAITING
33 netty-rpc-env-timeout TIMED_WAITING
55 org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner WAITING
59 pool-17-thread-1 TIMED_WAITING
2 Reference Handler WAITING
28 shuffle-client-0 RUNNABLE
35 shuffle-client-0 RUNNABLE
41 shuffle-client-0 RUNNABLE
37 shuffle-server-0 RUNNABLE
5 Signal Dispatcher RUNNABLE
23 threadDeathWatcher-2-1 TIMED_WAITING

Answer

I solved this by updating the following packages in my program jar:

  • spark 2.0.0 to 2.0.1
  • json4s 3.2.11 to 3.5.0
  • scallop 2.0.1 to 2.0.5
  • nscala-time 1.8.0 to 2.14.0