Yu Watanabe Yu Watanabe - 2 months ago 12
Python Question

Input and Output of function in pyspark

I'd like to get some lesson about the input grammar in pyspark.

My platform is below.

Red Hat Enterprise Linux Server release 6.8 (Santiago)
spark version 1.6.2
python 2.6


I have def defined in module basic_lib.py as below.

def selectRowByTimeStamp(x,y):
if x._1 > y._1:
return x
return y


Below is my snippet of the main code

df2 = df2.map(lambda x: (x._2, x))
rdd = df2.reduceByKey(basic_lib.selectRowByTimeStamp)


Why does above basic_lib.selectRowByTimeStamp work without clearly
specifying the input parameter?

For example , something like below is more easy to understand.

var1 = 1
var2 = 2

rdd = df2.reduceByKey(basic_lib.selectRowByTimeStamp(var1, var2))

Answer

It looks like you're slightly confused about what exactly is the purpose of lambda expressions. In general lambda expression in Python are used to create anonymous, single expression functions. Other than than that, as far as we care here, there are not different than any other function you define. To quote the docs:

Small anonymous functions can be created with the lambda keyword. (...) Lambda functions can be used wherever function objects are required. Semantically, they are just syntactic sugar for a normal function definition.

Since lambda functions are not special in Python in general there cannot be special in PySpark (well, they may require some serialization tricks due to their scope but it is only about their scope). No matter if function is defined by lambda or not (or if it is even a function*) Spark applies it in exactly the same way. So when you call:

df2.map(lambda x: (x._2, x))

lambda expression is simply evaluated and what is received by map is just another function object. It wouldn't be different if you assigned if first:

foo = lambda x: (x._2, x)  # Yup, this is against style guide (PEP 8)

or created a standalone function:

def bar(x):
    return x._2, x 

In all three cases function object is functionally pretty much the same:

import dis

dis.dis(foo)
##   1           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

dis.dis(bar)
##   2           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

dis.dis(lambda x: (x._2, x))
##   1           0 LOAD_FAST                0 (x)
##               3 LOAD_ATTR                0 (_2)
##               6 LOAD_FAST                0 (x)
##               9 BUILD_TUPLE              2
##              12 RETURN_VALUE

On a side note if input is a DataFrame here it is much more efficient to solve this using Spark SQL. Also it is better to extract rdd before you use map to ensure forward compatibility. Finally Row is just a tuple.

So optimally you could:

df.groupBy("_2").max()

but if you really want to use RDD API:

df.select("_2", "_1").rdd.reduceByKey(max)

* In practice any callable object will work as long as it accepts given arguments. For example (not that it makes much sense here) you could replace function with an object of a class defined as follows:

class FooBar(object):
    def __call__(self, x):
        return x._2, x

df2.rdd.map(FooBar())
Comments