fingerspitzen fingerspitzen - 4 months ago 14
Scala Question

Spark multiple dataframe saves

I have a spark job that creates a data frame that I save down to HDFS. What I would like to do is to save a subset of that data frame to another place, but I want to be performant about this.

The only transformation I have is the save itself...every other element of code for the spark job is an action. I do not cache the data frame. I am concerned that creating drop actions on a new data frame from the old one will undergo all the original data frames transformations again.

For example, I have something like:

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")

val subset_df = final_df.drop("this_column")
.drop("that_column")
.drop("another_column)

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)


But lets assume that
some_udf
is actually really compute intensive. I don't want it to run twice. Thus my question is:

Should I
final_df.cache()
before I declare
subset_df
and calling the saves to make sure that it doesn't perform the transformation of the udf again?

Something like:

val df = hiveContext.read.json("hdfs://HOSTNAME:PORT/user/spark/data/in/*")

val df2 = df.withColumn("new_column", some_udf("old_column")).drop("old_column")
.
.
.

val final_df = df10.withColumn("newest_column", another_udf("old_column2")).drop("old_column2")

val subset_df = final_df.drop("this_column")
.drop("that_column")
.drop("another_column)

final_df.cache() // This is the only new line

final_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)
subset_df.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir2)

Answer

You should cache:

val final_df = df10.withColumn(...)
val subset_df = final_df.drop(...)
final_df.cache() 

before first action otherwise it will execute twice (as you suspect).