Atif Shahzad Atif Shahzad - 3 months ago 20
Scala Question

Comparing two dataframe from history checking in Apache Saprk using scala

I have adataframe with following structure

EmployeeDF

id name date code
1 John 2015-4-14 C11
2 Roy 2011-5-20 C11
3 John 2010-5-20 C11
4 John 2012-5-20 C10


No i want to check history that if same code is apply to same employee two year ago. How can i do that. It is only sample data i have million of data in the dataframe and i want to achieve performance. Joining the dataframe slow down the performance because row are repeated so i make Cartesian and duplicate the rows during self join. I want to achieve with something like map.


EDIT: The current code (adding from OP's comments.)


In the first step I am getting those employee who are repeated more than once because we are checking history and if some employee exist only once it means there is no history for this employee. So code for this step is:

val uniqueEmpDF = SparkConfig
.sc
.sqlContext
.sql("SELECT *, '1' as level FROM cpeFirstStep WHERE e_id IN( SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)=1)")
.cache()


Second step is to get the employee who are repeated and code is this:

val repeatedEmpDF = SparkConfig
.sc
.sqlContext
.sql("SELECT *, '2' as level FROM cpeFirstStep WHERE e_id IN( SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)>1)")
.cache()


Now the main step is following:

val historyJoin = SparkConfig
.sc
.sql("SELECT x.*, CASE WHEN y.code = x.code THEN '3' ELSE '4' END level FROM repeatedEmptDF X " + "LEFT JOIN repeatedEmptDF Y ON y.e_id = x.e_id AND y.code = x.code " + "AND y.date < x.data - INTERVAL 2 YEAR")

Answer

So, there are many different ways of writing this, but assuming I have understood your example correctly, the spark code below will do the trick. Note that I have added an some extra data to the sample you gave and also that I assume that employee John should have the same id. So my test input looks like this:

import org.joda.time.LocalDate
val df = sc.parallelize(List((1, "John", new LocalDate(2015,4,14), "C11"),(2, "Roy", new LocalDate(2011,5,20), "C11"),(1, "John", new LocalDate(2010,5,20), "C11"),(1, "John", new LocalDate(2012,5,20), "C10"),(1, "John", new LocalDate(2013,1,14), "C11")))

Then for the actual identification of employees that have had the same code for at least 2 years:

df.map{case (id: Int, name: String, date: LocalDate, code: String) => ((id, name), List((date, code)))}
  .reduceByKey(_++_)
  .filter{case(_, listOfCodes) => listOfCodes.length >= 2} // Not interested in employees with only one code registered
  .flatMapValues(list => {
    def sameCodeForTwoYears(list: List[(LocalDate, String)]): List[(LocalDate, String)]  = {
      list match {
        case x :: Nil => List.empty
        case x :: xs => if (xs.head._1.minusYears(2).isAfter(x._1) && x._2 == xs.head._2) {
          List(x, xs.head)
        } else sameCodeForTwoYears(xs)
        case Nil => List.empty
      }
    }
    sameCodeForTwoYears(list.sortWith((left, right) => left._1.isBefore(right._1)))})
  .map{case((id, name),(date, code)) => (id, name, date, code)}

This will output:

(1,John,2013-01-14,C11)                                                         
(1,John,2015-04-14,C11)

Is this what you were looking for??

I have no idea what sort of performance you will get on your dataset, but hopefully you will get a general idea of how this can be written in Spark.