Anupam Anupam - 3 years ago 204
Scala Question

Join two data frame and update one data frame records with another

So i have two data frame .
Data Frame 1 like this :

+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 2| 1| I|!||
|4295877341| 141| 4| 1| I|!||
|4295877341| 143| 2| 1| I|!||
|4295877341| 145| 14| 1| I|!||
| 123456789| 145| 14| 1| I|!||
+----------+------+---------+--------+------+


DataFrame2 is like below

+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
| 123456789| 145| 14| 1| D|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+


Now i have to join both data frame update data frame 1 column with matching records with data frame 2 .

Now key in both data frame is OrgId and ItemId.

So the expected output should be .

+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346| 136| 4| 1| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+---------+--------+------+


So i need to update data frame 1 with data frame 2 records .
If records in data frame 1 is not found in 2 then also we need to retain that records .
If any new records are found in dataframe 2 then that records needs to added in the output

Here is what i am doing ..

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",$"segmentId_1",$"Sequence_1",$"Action_1")
.filter(!$"Action_1".contains("D"))
df3.show()


But i am getting below output .

+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877343| 149| 15| 2| I|!||
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+


I am not getting
4295877346| 136| 4| 1| I|!|
record from data frame 1 ...

left_outer gives me below output

+----------+------+-----------+----------+--------+
| OrgId|ItemId|segmentId_1|Sequence_1|Action_1|
+----------+------+-----------+----------+--------+
|4295877341| 136| null| null| I|!||
|4295877341| 138| 11| 22| I|!||
|4295877341| 141| 10| 1| I|!||
|4295877341| 143| 1| 1| I|!||
+----------+------+-----------+----------+--------+

Answer Source

Try this ..

val df3 = df1.join(df2, Seq("OrgId", "ItemId"), "outer")
      .select($"OrgId", $"ItemId",
        when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId").as("segmentId"),
        when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence").as("Sequence"),
        when($"Action_1".isNotNull, $"Action_1").otherwise($"Action").as("Action")).filter(!$"Action".contains("D"))

    df3.show()
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download