Steve Steve -4 years ago 194
Scala Question

Spark Dataframe Random UUID changes after every transformation/action

I have a Spark dataframe with a column that includes a generated UUID.
However, each time I do an action or transformation on the dataframe, it changes the UUID at each stage.

How do I generate the UUID only once and have the UUID remain static thereafter.

Some sample code to re-produce my issue is below:

def process(spark: SparkSession): Unit = {

import spark.implicits._

val sc = spark.sparkContext
val sqlContext = spark.sqlContext
sc.setLogLevel("OFF")

// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)

// register an UDF that creates a random UUID
val generateUUID = udf(() => UUID.randomUUID().toString)

// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", generateUUID())
dfWithUuid.show(false)
dfWithUuid.show(false) // uuid is different

// new transformations also change the uuid
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
}


The output is:

+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+


Note that the UUID is different at each step.

Answer Source

It is an expected behavior. User defined functions have to be deterministic:

The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.

If you want to include non-deterministic function and preserve the output you should write intermediate data to a persistent storage and read it back. Checkpointing or caching may work in some simple cases but it won't be reliable in general.

If upstream process is deterministic (for starters there is shuffle) you could try to use rand function with seed, convert to byte array and pass to UUID.nameUUIDFromBytes.

See also: About how to add a new column to an existing DataFrame with random values in Scala

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download