Karol Sudol Karol Sudol - 1 year ago 112
Scala Question

spark UDF pattern matching for multiple columns and collection elements

Given

df
as below:

enter image description here

val df = spark.createDataFrame(Seq(
(1, 2, 3),
(3, 2, 1)
)).toDF("One", "Two", "Three")


with schema:
enter image description here

I would like to write a
udf
that takes
Three columns
as inout; and returns new column based on highest input value similar as below:

import org.apache.spark.sql.functions.udf


def udfScoreToCategory=udf((One: Int, Two: Int, Three: Int): Int => {
cols match {
case cols if One > Two && One > Three => 1
case cols if Two > One && Two > Three => 2
case _ => 0
}}


It will be interesting to see how to do similar with
vector type
as input:

import org.apache.spark.ml.linalg.Vector

def udfVectorToCategory=udf((cols:org.apache.spark.ml.linalg.Vector): Int => {
cols match {
case cols if cols(0) > cols(1) && cols(0) > cols(2) => 1,
case cols if cols(1) > cols(0) && cols(1) > cols(2) => 2
case _ => 0
}})

Answer Source

Some problems:

  • cols in the first example are not in the scope.
  • (...): T => ... is not valid syntax for anonymous function.
  • It is better to use val over def here.

One way to define this:

val udfScoreToCategory = udf[Int, (Int, Int, Int)]{
  case (one, two, three) if one > two && one > three => 1
  case (one, two, three) if two > one && two > three => 2
  case _ => 0
}

and

val udfVectorToCategory = udf[Int, org.apache.spark.ml.linalg.Vector]{
  _.toArray match {
    case Array(one, two, three) if one > two && one > three => 1
    case Array(one, two, three) if two > one && two > three => 2
    case _ => 0
}}

In general, for the first case you should use ``when`

import org.apache.spark.sql.functions.when

when ($"one" > $"two" && $"one" > $"three", 1)
  .when ($"two" > $"one" && $"two" > $"three", 2)
  .otherwise(0)

where one, two, three are column names.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download