vatsal mevada vatsal mevada - 1 month ago 11
Scala Question

How to perform merge operation on spark Dataframe?

I have spark dataframe

mainDF
and
deltaDF
both with a matching schema.

Content of the mainDF is as follows:

id | name | age
1 | abc | 23
2 | xyz | 34
3 | pqr | 45


Content of
deltaDF
is as follows:

id | name | age
1 | lmn | 56
4 | efg | 37


I want to merge
deltaDF
with
mainDF
based on value of
id
. So if my
id
already exists in
mainDF
then the record should be updated and if
id
doesn't exist then the new record should be added. So the resulting data frame should be like this:

id | name | age
1 | lmn | 56
2 | xyz | 34
3 | pqr | 45
4 | efg | 37


This is my current code and it is working:

val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
mainDF= mainDF.except(updateDF).unionAll(deltaDF)


However here I need to explicitly provide list columns again in the select function which feels like overhead to me. Is there any other better/cleaner approach to achieve the same?

Answer

If you don't want to provide the list of columns explicitly, you can map over the original DF's columns, something like:

.select(mainDF.columns.map(c => $"main.$c" as c): _*)

BTW you can do this without a union after the join: you can use outer join to get records that don't exist in both DFs, and then use coalesce to "choose" the non-null value prefering deltaDF's values. So the complete solution would be something like:

val updatedDF = mainDF.as("main")
  .join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
  .select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)

updatedDF.show
// +---+----+---+
// | id|name|age|
// +---+----+---+
// |  1| lmn| 56|
// |  3| pqr| 45|
// |  4| efg| 37|
// |  2| xyz| 34|
// +---+----+---+
Comments