Katya Handler Katya Handler - 5 months ago 110
Python Question

withColumn not allowing me to use max() function to generate a new column

I have a dataset like this:

a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"])


I want to have a dataset that adds a new column that is equal to the largest value in the other three columns.
The output would look like this:

+----+----+-----+-------+
|one |two |three|max_col|
+----+----+-----+-------+
| 1| 2| 3| 3|
| 0| 2| 1| 2|
| 9| 8| 7| 9|
+----+----+-----+-------+


I thought I would use
withColumn
, like so:

b = a.withColumn("max_col", max(a["one"], a["two"], a["three"]))


but this yields the error

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark152/python/pyspark/sql/column.py", line 418, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.


Odd. Does
max
return a
bool
? Not according to the documentation on
max
. Okay. Weird.

I find it odd that this works:

b = a.withColumn("max_col", a["one"] + a["two"] + a["three"]))


And the fact that it works makes me think even more strongly that
max
is behaving some way I don't understand.

I also tried
b = a.withColumn("max_col", max([a["one"], a["two"], a["three"]]))
, which passes in the three columns as a list rather than 3 separte elements. This yields the same error as above.

Answer

Actually what you need here is greatest not max:

from pyspark.sql.functions import greatest

a.withColumn("max_col", greatest(a["one"], a["two"], a["three"]))

And just for completeness you can use least to find the minimum:

from pyspark.sql.functions import least

a.withColumn("min_col", least(a["one"], a["two"], a["three"]))

Regarding the error you see it is quite simple. max depends on the rich comparisons. When you compare two columns you get a Column:

type(col("a") < col("b")
## pyspark.sql.column.Column

PySpark explicitly forbids converting columns to booleans (you can check Column.__nonzero__ source) because it is simply meaningless. It is only a logical expression which cannot be evaluated in the driver context.