pythonic pythonic - 4 months ago 14
Scala Question

How can I pass Spark's accumulator to a function?

I want to do something like this.

val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....


What should be in the place of
the_accumulator_ojbect
in the code above? Would writing
ac
there be just fine?

Also, in the function

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
.....
}


What should be in the place of
TypeOfAccumulator
in the function above?

Answer

Additional info about Spark accumulators can be found here

According to the scala-docs regarding the creation of the accumulator:

/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the += method. Only the * driver can access the accumulator's value. */

The default accumulator type is int. You can set your own type though, but need to properly implement the += method to add values to your own accumulator type:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")

Your main code fragment will be like:

val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)

Where the someFunction method implantation will be like:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
    ...
    ac += 1
    ...
}
Comments