Shankar Shankar - 12 days ago 7
Scala Question

Spark SQL DataFrame join with filter is not working

I'm trying to filter df1 by joining df2 based on some column and then filter some rows from df1 based on filter.

df1:

+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| green|
|Rapid Cash Plus| green|
| DOTOPAL| green|
| RAPID CASH| green|


df2:

+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| blue|
|Rapid Cash Plus| blue|
| DOTOPAL| blue|
+---------------+----------+


Sample code is:

df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
.filter(not(df1.col("rag_status") === "green"))
.select(df1.col("channel"), df1.col("rag_status")).show


Its not returning any records.

I'm expecting the output as below one, which is returned from df1 after filtering the records based on
channel
and
green
status condition. If the same channel is available in the df2 and the df1
rag_status
is green, then remove that record from df1 and return the remaining records only from df1.

Expected output is:

+---------------+----------+
| channel|rag_status|
+---------------+----------+
| RAPID CASH| green|

Answer

You can work something like this :

val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show

Spark-shell Output:

scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]

scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]

scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]

scala> resultDF.show
+----------+----------+                                                         
|   channel|rag_status|
+----------+----------+
|RAPID CASH|     green|
+----------+----------+