dmbaker dmbaker - 6 months ago 109
Python Question

Pyspark combineByKey list of tuples and count

I have been having difficulty combining keyed tuples of two floats in an rdd into a list of theses keyed tuples into a list and the count of each, for each key. I am using rdd. combineByKey to do this. If I pass values to the three lambda’s outside of combineByKey I get the results I would expect, a key and a tuple of list of tuples and the count. But, when I do this with combineByKey, I get a key and a tuple of list of one constant value and the count. What may I be missing here? I captured the REPL output below to illustrate.

--- j = jnd.take(2)

--- j

[[u'14225532965000', (0.2072, 82777.0)], [u'14217732824000', (0.20361902000000001, 77271.0)]]

--- creCmb = (lambda v: ([v[1]], 1))

--- mrgVal = (lambda x, v: (x[0]+[v[1]], x[1]+1))

--- mrgCmb = (lambda x, y: (x[0]+y[0], x[1]+y[1]))

--- x = creCmb(j[0])

--- x

([(0.2072, 82777.0)], 1)

--- m = mrgVal(x, j[1])

--- m

([(0.2072, 82777.0), (0.20361902000000001, 77271.0)], 2)

--- r = mrgCmb(m, m)

--- r

([(0.2072, 82777.0), (0.20361902000000001, 77271.0), (0.2072, 82777.0), (0.20361902000000001, 77271.0)], 4)

--- cmb = jnd.combineByKey(creCmb, mrgVal, mrgCmb)

--- cmb.count()

4513

--- cmb.take(1)

[(u'14225532026000', ([56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, ...
... 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0, 56917.0], 741))]

Answer

combineByKey only calls the functions provided to it on the values of each row. In creCmb and mrgVal, when you use v[1] instead of just v, you only keep the second element of each value tuple, resulting in a list of single elements in the end. In your test, you call creCmb on a key-value pair instead of just a value tuple, which is why the test output is correct but the combineByKey output is not.

To fix this, simply remove the [1] indexing from both creCmb and mrgVal:

creCmb = (lambda v: ([v], 1))
mrgVal = (lambda x, v: (x[0]+[v], x[1]+1))