T. Bombeke T. Bombeke - 2 months ago 8
Scala Question

Why does transform do side effects (println) only once in Structured Streaming?

Why does the

select
statement is printed every batch but the
hello world
only once?

import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
.schema(schema)
.format("csv")
.option("header", false)
.option("maxFilesPerTrigger", 1)
.option("delimiter", ";")
.load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
.transform { ds =>
println("hello world") // <-- Why is this printed out once?
ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
.format("console")
.start

Answer

Spark 2.1.0-SNAPSHOT here (built today) but I believe it didn't change between 2.0 and now.

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.

In Spark's Structured Streaming, your streaming application is a mere trick to apply the same physical query plan to the input data sources.

Please note that the physical query plan is what makes your Dataset (and the more I'm with Spark SQL the more I see no difference between queries and datasets -- they're simply interchangeable these days).

When you describe a structured query (regardless of whether it is going to be a one-off or streaming query) it goes through 4 stages of parsing, analyzing, optimizing and finally producing physical plan. You can review it using explain(extended = true) method.

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]

The stages are lazy and executed only once.

Once you've got the physical plan, the stages won't be executed again. Your Dataset pipeline is already computed and the only missing piece is the data to flow through the pipe.

That's why you see "hello world" only once -- when the streaming query plan was "executed" to produce the physical plan. It was executed once and optimized for processing the source Dataset (and only the Dataset so any side effects were already triggered).

An interesting case. That's a lot for bringing it up here!

Comments