pythonic pythonic - 2 months ago 8
Scala Question

How can I pass Spark's accumulator to a function?

I want to do something like this.

val ac = sc.accumulator(0)
a = => someFunction(x, the_accumulator_object))

What should be in the place of
in the code above? Would writing
there be just fine?

Also, in the function

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =

What should be in the place of
in the function above?


Additional info about Spark accumulators can be found here

According to the scala-docs regarding the creation of the accumulator:

/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the += method. Only the * driver can access the accumulator's value. */

The default accumulator type is int. You can set your own type though, but need to properly implement the += method to add values to your own accumulator type:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")

Your main code fragment will be like:

val ac = sc.accumulator(0, "some accumulator")
a = => someFunction(x, ac))
System.out.println("My accumulator value is: " + ac.value)

Where the someFunction method implantation will be like:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
    ac += 1