ksdaly ksdaly - 1 year ago 77
Scala Question

Spark conditionally merging 2 dataframes in Scala

I have 2 Dataframes that I need to merge a single column together conditionally. Whenever there is a null value in the id column, i need to reference another DF to get the value from there.

df1.show()
+-----+---+----+-----+
|group|cat| id|value|
+-----+---+----+-----+
| X| A| 1| 20.0|
| X| A| 2| 20.0|
| X| A|null| 55.0|
| X| B| 1| 20.0|
| X| B|null| 55.0|
| Y| A| 3| 20.0|
| Y| A| 4| 20.0|
| Y| A|null| 55.0|
| Y| B| 4| 20.0|
| Y| B|null| 55.0|
+-----+---+----+-----+
df2.show()
+---+-------+
|cat| id|
+---+-------+
| A|1 2 3 4|
| B| 1 4|
+---+-------+


With my final desired result being:

+-----+---+-------+-----+
|group|cat| id|value|
+-----+---+-------+-----+
| X| A| 1| 20.0|
| X| A| 2| 20.0|
| X| A|1 2 3 4| 55.0|
| X| B| 1| 20.0|
| X| B| 1 4| 55.0|
| Y| A| 3| 20.0|
| Y| A| 4| 20.0|
| Y| A|1 2 3 4| 55.0|
| Y| B| 4| 20.0|
| Y| B| 1 4| 55.0|
+-----+---+-------+-----+


I've tried something similar to another answer here (Conditional Join in Spark DataFrame)

val joinCondition = when($"a.id".isNull && $"a.cat" === "b.cat", $"a.id"===$"b.id")
.otherwise($"a.id")

df1.as('a).join(df2.as('b), joinCondition).show


but always end up with an empty dataframe.

Any thoughts on the correct join condition?

Answer Source

You can join the two data frames, and then update id with id from df2 using coalesce function:

(df1.join(df2.withColumnRenamed("id", "id2"), Seq("cat"), "left")
    .withColumn("id", coalesce($"id", $"id2")).drop("id2").show)

+---+-----+-------+-----+
|cat|group|     id|value|
+---+-----+-------+-----+
|  A|    X|      1| 20.0|
|  A|    X|      2| 20.0|
|  A|    X|1 2 3 4| 55.0|
|  B|    X|      1| 20.0|
|  B|    X|    1 4| 55.0|
|  A|    Y|      3| 20.0|
|  A|    Y|      4| 20.0|
|  A|    Y|1 2 3 4| 55.0|
|  B|    Y|      4| 20.0|
|  B|    Y|    1 4| 55.0|
+---+-----+-------+-----+
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download