SUDARSHAN SUDARSHAN - 23 days ago 9
Scala Question

Join two data frame in sequential manner and remove duplicate and rows with Delete action

I have two data frames that i have created from two text files .
df1 is here :

+--------------------+----------+------+---------+--------+------+
| EntryDate| OgId|ItemId|segmentId|Sequence|Action|
+--------------------+----------+------+---------+--------+------+
|2017-06-07T09:04:...|4295877341| 136| 4| 1| I|!||
|2017-06-07T09:04:...|4295877346| 136| 4| 1| I|!||
|2017-06-07T09:04:...|4295877341| 138| 2| 1| I|!||
|2017-06-07T09:04:...|4295877341| 141| 4| 1| I|!||
|2017-06-07T09:04:...|4295877341| 143| 2| 1| I|!||
|2017-06-07T09:04:...|4295877341| 145| 14| 1| I|!||
|2017-06-07T09:04:...| 123456789| 145| 14| 1| I|!||
+--------------------+----------+------+---------+--------+------+


df2 is here :

+--------------------+----------+------+---------+--------+------+
| EntryDate value| OgId|ItemId|segmentId|Sequence|Action|
+--------------------+----------+------+---------+--------+------+
|2017-06-07T09:04:...|4295877341| 136| 4| 1| I|!||
|2017-06-07T09:05:...|4295877341| 136| 5| 2| I|!||
|2017-06-07T09:06:...|4295877341| 138| 4| 5| I|!||
|2017-06-07T09:07:...|4295877341| 141| 9| 1| I|!||
|2017-06-07T09:08:...|4295877341| 143| null| 2| I|!||
|2017-06-07T09:09:...|4295877343| 149| 14| 2| I|!||
|2017-06-07T09:10:...| 123456789| 145| 14| 1| D|!||
+--------------------+----------+------+---------+--------+------+


Now i have to join these two data frame in such a way that final data frame will have unique records .

Also if the df2 has any column value null then df1 corresponding value should be in the final output

Here Action tag 'U' is for Update 'D' is for delete .

My Final Output file should be like below

+----------+------+---------+--------+------+
| OgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 5| 2| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 4| 5| I|!||
|4295877341| 141| 9| 1| I|!||
|4295877341| 143| 2| 2| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 14| 2| I|!||
+----------+------+---------+--------+------+


My primary key for both data frame is OgId +ItemId.

Here is what i got from one of the answer

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.unionAll(df2).distinct()
df1.show()


But i want to update df1 with df2 in the order of the EntryDate ..

For example 4295877341| 136 there are two update so update will happen from df2 in the same order of data in the df2 .

This is because sometime for some rows update happen first then delete .So if in case delete happen then edit will throw error as it will not find rows to update .

Finally If Action is 'D' then the row from DF1 will be removed ,again this also should happen in proper order .

I hope my question clear .

Updating the suggested answer code ..

package sparkSql

import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

object PcfpDiff {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]"); //Creating spark configuration
// val conf = new SparkConf().setAppName("WordCount");
conf.set("spark.shuffle.blockTransferService", "nio")
val sc = new SparkContext(conf); //Creating spark context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType }
import org.apache.spark.sql.functions.udf

val schema = StructType(Array(

StructField("OrgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))

import org.apache.spark.sql.functions._

val textRdd1 = sc.textFile("/home/cloudera/TRF/pcfp/Text1.txt")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)

val textRdd2 = sc.textFile("/home/cloudera/TRF/pcfp/Text2.txt")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)

val tempdf2 = df2.withColumnRenamed("segmentId", "segmentId_1").withColumnRenamed("Sequence", "Sequence_1").withColumnRenamed("Action", "Action_1")

df1.join(tempdf2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",
when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId").as("segmentId"),
when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence").as("Sequence"),
when($"Action_1".isNotNull, $"Action_1").otherwise($"Action").as("Action"))

df1.show()

}
}


And i am getting below output ...
Segmentid And SequenceId is not getting updated ..

+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 2| 1| I|!||
|4295877341| 141| 4| 1| I|!||
|4295877341| 143| 2| 1| I|!||
|4295877341| 145| 14| 1| I|!||
| 123456789| 145| 14| 1| I|!||
+----------+------+---------+--------+------+


Data set 1

4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|


Data set 2

4295877341|^|136|^|4|^|1|^|I|!|
4295877341|^|136|^|5|^|2|^|I|!|
4295877341|^|138|^|4|^|5|^|I|!|
4295877341|^|141|^|9|^|1|^|I|!|
4295877341|^|143|^|null|^|2|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|


Regards,
Sudarshan

Answer Source

here's the working solution I got for you

val tempdf2 = df2.except(df1).withColumnRenamed("segmentId", "segmentId_1")
  .withColumnRenamed("Sequence", "Sequence_1")
  .withColumnRenamed("Action", "Action_1")

val df3 = df1.join(tempdf2, Seq("OrgId", "ItemId"), "outer")
  .select($"OrgId", $"ItemId",
    when($"segmentId_1" =!= "null", $"segmentId_1").otherwise($"segmentId").as("segmentId"),
    when($"Sequence_1" =!= "null", $"Sequence_1").otherwise($"Sequence").as("Sequence"),
    when($"Action_1" =!= "null", $"Action_1").otherwise($"Action").as("Action"))
  .filter(!$"Action".contains("D"))
df3.show()

I hope the answer is helpful and if not you can take ideas, modify according to your needs.