Balaji Reddy Balaji Reddy - 1 month ago 25
Scala Question

Effective Exception handling with spark and scala

I know the question is not something new but my context is different.I have been trying best possible way to handle exceptions effectively.Below is my experimental code

val in = List("1", "2", "3", "abc")
val sc = prepareConfig();
val rdd = sc.parallelize(in)
val mapRDD = rdd.map { x => Try { x.toInt } }
val successRDD = mapRDD.filter { x => x.isSuccess }
val successRDD2 =
successRDD.map { x => Try { x.get * x.get } }.filter{ _.isSuccess }
.
. // some more Transformation operations based on the use case
.

successRDD10.collect()


Assume that I may need to have 10 transformations which leads to end result. Should I need to have Try{} for all my transformations ? Expectation is I may get error in any of the 10 transformations.So i'm trying to filter successful results and passing it to next stage. so is the above one is effective way of exception handling or any other alternative approach recommended ?

Answer

A better way:

val res: RDD[Int] = rdd.flatMap(x => Try(x.toInt).map(i => i * i).toOption)

It really does what you are trying to achieve. How?

  1. Each String is converted into an Int: Try(x.toInt)
  2. If the conversion is successful, the integer is multiplied by itself: map(i => i * i)
  3. The resulting Try[Int] is converted to an Option[Int] that is None if the Try[Int] is a Failure and Some(value) otherwise: toOption
  4. Thanks to the flatMap operation, only Option[Int] that are defined (i.e not None, thus the conversion was successful) remain in the resulting RDD[Int]

No need for the extra filters and isSuccess calls.

Comments