Dmitry Dzhus Dmitry Dzhus - 13 days ago 5
Scala Question

subtractByKey modifes values in the source RDD

I'm experiencing a problem with

subtractByKey
using Spark 2.0.2 with Scala 2.11.x (it also reproduces with Spark 1.6.2 and Scala 2.10):

Relevant code:

object Types {
type ContentId = Int
type ContentKey = Tuple2[Int, ContentId]
type InternalContentId = Int
}

val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = itemIDMap.map(_.swap).cache()
logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs")
logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))

val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))

val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] =
inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered inverse ID mapping")
logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E ", ""))


The operation in question is
.subtractByKey
. Both RDDs involved are cached and forced via
count()
prior to calling
subtractByKey
, so I would expect the result to be unaffected by how exactly
superfluousItems
is built.

I added debugging output and filtered the resulting logs by relevant
InternalContentId
values (829911, 830071). Output:

Built an inverse map of 827354 item IDs
.
.
I->E (829911,(2,1135081))
I->E (830071,(1,2295102))
.
.
748190 items in the training set had less than 28 ratings
SI (829911,3)
.
.
79164 items in the filtered inverse ID mapping
F I->E (830071,(2,1135081))


There's no element with key 830071 in superfluousItems (SI), so it's not removed from the source RDD. However, its value is for some reason replaced with the one from key 829911. How could this be? I cannot reproduce it locally - only when running on a multi-machine cluster. Is this a bug or I'm missing something?

Answer

The problem was that I assumed that caching and forcing an RDD guarantees that it will never be re-evaluated. inverseItemIDMap is built using a non-determenistic operation, and multiple uses of it also give different results:

val itemIDMap: RDD[(ContentKey, InternalContentId)] =
  rawEvents
  .map(_.content)
  .distinct
  .zipWithUniqueId()
  .map(u => (u._1, u._2.toInt))
  .cache()
logger.info(s"Built a map of ${itemIDMap.count()} item IDs")
val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] =
  itemIDMap.map(_.swap).cache()

I made the operation stable by adding .sortBy(c => c) before .zipWithUniqueId() and this solved the issue.