user2811630 user2811630 - 2 months ago 32
Scala Question

select / drop does not really drop the column?

I think I don't undestand how select or drop are working.

I am exploding a dataset and I don't want some of the columns to be copied to the newly generated entries.

val ds = spark.sparkContext.parallelize(Seq(
("2017-01-01 06:15:00", "ASC_a", "1"),
("2017-01-01 06:19:00", "start", "2"),
("2017-01-01 06:22:00", "ASC_b", "2"),
("2017-01-01 06:30:00", "end", "2"),
("2017-01-01 10:45:00", "ASC_a", "3"),
("2017-01-01 10:50:00", "start", "3"),
("2017-01-01 11:22:00", "ASC_c", "4"),
("2017-01-01 11:31:00", "end", "5" )
)).toDF("timestamp", "status", "msg")
ds.show()

val foo = ds.select($"timestamp", $"msg")
val bar = ds.drop($"status")
foo.printSchema()
bar.printSchema()
println("foo " + foo.where($"status" === "end").count)
println("bar" + bar.where($"status" === "end").count)


Output:


root
|-- timestamp: string (nullable = true)
|-- msg: string (nullable = true)

root
|-- timestamp: string (nullable = true)
|-- msg: string (nullable = true)


foo 2

bar 2


Why do I still get an output of 2 for both though I

a) did not select status

b) dropped status

EDIT:

println("foo " + foo.where(foo.col("status") === "end").count)
says that there is no column status. Should this not be the same as
println("foo " + foo.where($"status" === "end").count)
?

Answer Source

Why do I still get an output of 2 for both

Because optimizer is free to reorganize the execution plan. In fact if you check it:

== Physical Plan ==
*Project [_1#4 AS timestamp#8, _3#6 AS msg#10]
+- *Filter (isnotnull(_2#5) && (_2#5 = end))
   +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#5, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._3, true) AS _3#6]
      +- Scan ExternalRDDScan[obj#3]

you'll see that filter is pushed down as early as possible and executed before project. Arguably it is a minor bug, and code should throw an exception.