user1058511 - 1 year ago 74

Scala Question

I ran into this line in Spark source

`val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)`

.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(

seqOp = (c, v) => {

// c: (grad, loss, count), v: (label, features)

val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))

(c._1, c._2 + l, c._3 + 1)

},

combOp = (c1, c2) => {

// c: (grad, loss, count)

(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)

})

I have multiple trouble reading this. first I can't find anything on the web that explains exactly how treeAggregte works, what are the meaning of the params. Second, here .treeAggregate seems to have two ()() following the method name. What could that mean? Is that some special scala syntax that I don't understand. Finally, I see both seqOp and comboOp return a 3 element tuple which match the expected left hand side variable, but which one actually gets returned?

This statement must be really advanced. I can't begin to decypher this.

Answer Source

`treeAggregate`

is a specialized implementation of `aggregate`

that iteratively applies the combine function to a subset of partitions. This is done in order to prevent returning all partial results to the driver where a single pass reduce would take place as the classic `aggregate`

does.

For all practical purposes, `treeAggregate`

follows the same principle as `aggregate`

explained in this answer: Explain the aggregate functionality in Python with the exception that it takes an extra parameter to indicate the depth of the partial aggregation level.

Let me try to explain what's going on here specifically:

For aggregate, we need a zero, a combiner function and a reduce function.
`aggregate`

uses currying to specify the zero value independently of the combine and reduce functions.

We can then dissect the above function like this . Hopefully that helps understanding:

```
val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long) = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
```

Then we can rewrite the call to `treeAggregate`

in a more digestable form:

```
val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)
```

This form will 'extract' the resulting tuple into the named values `gradientSum, lossSum, miniBatchSize`

for further usage.

Note that `treeAggregate`

takes an additional parameter `depth`

which is declared with a default value `depth = 2`

, thus, as it's not provided in this particular call, it will take that default value.