Shashishekhar Deshpande Shashishekhar Deshpande - 1 month ago 23
Scala Question

In Spark Dataframe how to get duplicate records and distinct records in two dataframes?

I am working on a problem in which I am loading data from a hive table into spark dataframe and now I want all the unique accts in 1 dataframe and all duplicates in another. for example if I have acct id 1,1,2,3,4. I want to get 2,3,4 in one dataframe and 1,1 in another. How can I do this?

Answer
val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details")
scala> acctDF.show()
+------+-------+
|AcctId|Details|
+------+-------+
|     1|   Acc1|
|     1|   Acc1|
|     1|   Acc1|
|     2|   Acc2|
|     2|   Acc2|
|     3|   Acc3|
+------+-------+

val countsDF = acctDF.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount")

val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount"))

scala> accJoinedDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
|     3|   Acc3|        1|
+------+-------+---------+


val distAcctDF = accJoinedDF.filter($"AcctCount"===1)
scala> distAcctDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     3|   Acc3|        1|
+------+-------+---------+

val duplAcctDF = accJoinedDF.filter($"AcctCount">1)
scala> duplAcctDF.show()
+------+-------+---------+                 
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
+------+-------+---------+

(OR scala> duplAcctDF.distinct.show() )
Comments