karoma karoma - 1 month ago 9
Scala Question

Updating some row values in a Spark DataFrame

I have a dataframe which I want to merge into another dataframe, but only to affect specific cells rather than a whole row.

Old dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bc|
## +---+----+----+


New dataframe:

## +---+----+
## |key|val1|
## +---+----+
## | 2| bbb|
## +---+----+


Result:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bbb| bc|
## +---+----+----+


The key is unique in this case so the row to be affected will always be identifiable. The old dataframe will also always contain the keys from the new dataframe.

As dataframes are immutable I'll have to call
withColumn
to create a new one, presumably by passing in some sort of UDF, but I'm a bit lost when it comes to what that UDF should contain.

Answer

You need to use an outer join to get the expected output :

scala> val oldDf = Seq((1, "aa", "ab"), (2, "bb", "bc")).toDF("key", "val1", "val2").as("old")
// oldDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string ... 1 more field]
scala> val newDf = Seq((2, "bbb")).toDF("key", "val1").as("new")
// newDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string]

scala> oldDf.join(newDf, Seq("key"), "outer").select($"key", coalesce($"new.val1", $"old.val1").alias("val1"), $"val2").show
// +---+----+----+
// |key|val1|val2|
// +---+----+----+
// |  1|  aa|  ab| 
// |  2| bbb|  bc|
// +---+----+----+

Note: coalesce will select the first not null value between new.val1 and old.val1.

Comments