xxx222 xxx222 - 14 days ago 8
Python Question

What is the equivalent expression of this spark code (scala) in pyspark?

val aggregatedBigrams = bigramTokens.reduceByKey({(x:(Int,Int), y:(Int,Int)) => (x._1+y._1, x._2+y._2)}, numReducers)


I've seen a lot of spark code writing as above, however I am truly confused by the use of underscore such as
x._1
, I searched on internet and was told the underscore means getting the element of a tuple, so I assume
x._1 = x(0)
, so in pyspark, should I write as
x[0]
?

Also, why should I write out the type like
x:(Int,Int)
? Do I have to do the similar thing in pyspark?

Answer

In Scala, the syntax (x: T) => y denotes an anonymous function, where the part before =>, here (x: T), determines the function's arguments, and the part after, here y, is the return value. In your example, the arguments are (x:(Int,Int), y:(Int,Int)), which means the function takes two arguments, x and y, both of which are expected to be 2-tuples on integer values. The return value is another 2-tuple of integer values.

The equivalent to a Scala anonymous function in Python is a lambda function. Defining a lambda function with two arguments looks like lambda x, y: .... Python doesn't need specific types, so you don't have to specify the argument types explicitly to be tuples of integers like in Scala. Actually, with Python's duck typing philosophy you just care that whatever is passed in support the operators you use (indexing and addition). You can still give type hints nowadays, but you don't have to.

As you said, tuple indexing in Python is done with [i], so your full code would look like:

aggregatedBigrams = bigramTokens.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]), numReducers)