mongolol mongolol - 1 month ago 12
Apache Configuration Question

Selecting the Earliest and Latest Dates from a Grouped RDD

I have a grouped RDD of the form (patientID, [Medication]), where Medication is the below case class:

case class Medication(patientID: String, date: Date, medicine: String)


The RDD is formed by the below line:

val grpMeds = medication.groupBy(_.patientID)


Where medication is an RDD of the form RDD[Medication].

For each patient, I'm trying to find the earliest and latest dates a particular kind of medicine, "medicine_A," was administered (note that medicine is a method of
case class Medication
). What I'm looking to obtain is an RDD of the format RDD[patientID, earliestDate, latestDate], but can't figure out how to obtain this.

Any help would be hugely appreciated. An example of what the data looks like (obtained from
grpMeds.take(0).foreach(println)
) is shown below.

Medication(000961291-01,Tue Jun 21 19:45:00 UTC 2005,Isotonic Saline (0.9%))
Medication(000096430-01,Mon Nov 15 20:45:00 UTC 2010,insulin aspart)

Answer

Using groupBy is a very inefficient way to get there. As a replacement I would recommend using Spark SQL or reduceByKey.

For Spark SQL you should convert medication to a DataFrame:

import spark.implicits._  // import sqlContext.implicits._

val medicationDF = medication.toDF

and use groupBy followed by agg:

medicationDF.groupBy($"patientID", $"medicine").agg(min($"date"), max($"date"))

For this solution date should be java.sql.Date or java.sql.Timestamp.

For reduceByKey first you should reshape medication to get key composed from patientId and medicine and value which is a duplicated date:

val medicationPairs = medication.map(m => 
  ((m.patientID, m.medicine), (m.date, m.date))
)

Next reduceByKey:

medicationPairs.reduceByKey { 
  case ((xMin, xMax), (yMin, yMax)) => (
    if(xMin.before(yMin)) xMin else yMin,
    if(xMax.after(yMax))  xMax else yMax
  )
}
Comments