Agustin Luques Agustin Luques - 1 month ago 20
Python Question

python - Lookup from a RDD inside a map of another RDD

I have 2 RDD:


  • RDD1 = (word, score) #word: string | score: int

  • RDD2 = (id, text) #id: int | text: list of words



so for each 'id' in RDD2 I want to calculate the mean of score for each word in text if it has score

def predecir(texto):
contador = 0
prediccion = 0
for palabra in texto:
puntaje = listaRDD.lookup(palabra)
if puntaje:
puntaje = puntaje[0]
prediccion += puntaje
contador += 1
return (float(prediccion)/ contador)

listaTestRDD = listaTestRDD.map(lambda x: (x[0], predecir(x[1])))
print listaTestRDD.take(1)


And I get this error message


Exception: It appears that you are attempting to broadcast an RDD or
reference an RDD from an action or transformation. RDD transformations
and actions can only be invoked by the driver, not inside of other
transformations; for example, rdd1.map(lambda x: rdd2.values.count() *
x) is invalid because the values transformation and count action
cannot be performed inside of the rdd1.map transformation. For more
information, see SPARK-5063.


How can I do to solve it? I canĀ“t use two RDD one inside the other? How can I convert the RDD1 to a dictionary in order to find a word in O(1)?

Answer

Try:

RDD2.flatMapValues(lambda x: x) \
    .map(lambda x: (x[1], x[0])) \
    .leftOuterJoin(RDD1) \
    .values() \
    .map(lambda x: (x[0], (x[1], 1) if x[1] is not None else (0, 0))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .mapValues(lambda x: x[0] / float(x[1]) if x[1] else 0.0)
Comments