Renée Renée - 2 months ago 20
Scala Question

Spark Scala create pair RDD with sorted Date values

caveat: i am new to spark and scala. i've found a handful of questions on Stack Overflow that are very similar to mine, but haven't been able to translate those into my problem.

Context. I have a pair RDD initially with records of the form (id, date) and I want to create an RDD of the form (id, last_date_seen). In the raw data, the date is a string and i've used Joda to convert to a

DateTime
.

I have successfully done this using
combineByKey
, and I understand that
groupByKey
is inefficient and this might not be practical in big cases, but I'm trying to understand approaches using the range of calls.

What i want to do is
groupByKey
and then
mapValues
, taking the list of values produced by
groupByKey
to get the max in the list.

What i've tried:

(I created an ordering on DateTime based on a different Stack Overflow question, so there is an ordering.)

I've tried a number of approaches, and most give me an exception that the task is not serializable. One example is,

rdd.groupByKey().mapValues(_.toList.sorted.last)


I've tried any number of variants of this ;) Without the
toList
, I get an exception that sorted is not a member of
Iterable[org.joda.time.DateTime]
. I'm successful in using
mapValues
and doing simpler things, but once I try to add sort, things go bad. I tried
sortBy
and specifying the
Ordering
.

Insights into why things sent to the sorted method aren't serializable would be helpful to me overall. I'm not sure how to recognize when I'm falling into this trap.

One of the Stack Overflow questions that was similar suggested that instead of using
mapValues
, you could just use
sortBy
and specify that it was on the second element, so
.sortBy(_._2)
. This also failed for me. Ideally, if it made sense to do it that way, I'd like to know that too.

This seems like a really straightforward and probably common thing to do, so I feel like I'm missing something.

Edit - added for further detail of the exception. Note though that I wasn't able to reproduce this error.

The not serializable errors in the error stack indicate that the implicit ordering i had used, found in another Stack Overflow was the culprit. Note that I was not able to reproduce this error that aggrivated me for hours (see answer).

Caused by: java.io.NotSerializableException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$
Serialization stack:
- object not serializable (class:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$,
value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$@6fc2db37)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,
name: Joda$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$)


The Joda module had been defined just before.

object Joda {
implicit def dateTimeOrdering: Ordering[DateTime] =
Ordering.fromLessThan(_ isBefore _)
}

Answer

My original question really had two components: * how to transform the RDD using groupBy in order to retrieve the last seen date for each id, and * why was the approach I tried giving a "task not serializable" error.

Unfortunately, after restarting the spark-shell and retracing my steps, i was unable to reproduce this error. The code I had listed in the question, paired with the DateTime ordering I had already established worked fine. I recently had another issue like this for which i was able to trace it down to a conflict in implicit values that i had set earlier in the shell for totally different purposes. i suspect this was also the culprit here, but wasn't able to verify that.

The other Stack Overflow question referenced in the comments indicates that the Joda have caused issues for others.

For completeness, I am able to do the transform and extract the last date seen several ways. The most straightforward being that given by @zero323 in their comment using reduceByKey.

Using groupByKey, the code in the question

rdd.groupByKey().mapValues(_.toList.sorted.last)

works fine when the implicit ordering below is in place:

object Joda {
   implicit def dateTimeOrdering: Ordering[DateTime] =  
   Ordering.fromLessThan(_ isBefore _)}
import Joda._

similarly,

rdd.groupByKey.mapValues(_.toList.max)

results in the same.

I've also replicated the results using an ordering defined and passed to sorted explicitly.

Unfortunately, I was unable to determine why the object Joda threw an exception in the first session and didn't in the next many I tried.

Comments