Lior Lior - 10 months ago 52
Python Question

Spark: How to "reduceByKey" when the keys are numpy arrays which are not hashable?

I have an RDD of (key,value) elements. The keys are NumPy arrays. NumPy arrays are not hashable, and this causes a problem when I try to do a


Is there a way to supply the Spark context with my manual hash function? Or is there any other way around this problem (other than actually hashing the arrays "offline" and passing to Spark just the hashed key)?

Here is an example:

import numpy as np
from pyspark import SparkContext

sc = SparkContext()

data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)

The error is:

An error occurred while calling


TypeError: unhashable type: 'numpy.ndarray'

Answer Source

The simplest solution is to convert it to an object that is hashable. For example:

from operator import add

reduced = sc.parallelize(data).map(
    lambda x: (tuple(x), x.sum())

and convert it back later if needed.

Is there a way to supply the Spark context with my manual hash function

Not a straightforward one. A whole mechanism depend on the fact object implements a __hash__ method and C extensions are cannot be monkey patched. You could try to use dispatching to override pyspark.rdd.portable_hash but I doubt it is worth it even if you consider the cost of conversions.