javadba javadba - 3 months ago 32
Scala Question

Spark udf initialization

I want to create a customized regex-based UDF in spark sql. My preference would be to create a memory-resident

Map[String,Pattern]


Where Pattern refers to a compiled regex version of the string key. But to do this we need to put the map creation into an "initialize" function of the UDF.

So are there any structures for a spark udf supporting persistent state across invocations (via spark sql) ?

Note that HIVE does support the lifecycle of UDF's. I used that to generated Parse Trees as part of the initialization so that actual invocation of the UDF was against lightning fast trees with no parsing involved.

Answer

Lets start with imports and some dummy data:

import org.apache.spark.sql.functions.udf
import scala.util.matching.Regex
import java.util.regex.Pattern

val df = sc.parallelize(Seq(
  ("foo", "this is bar"), ("foo", "this is foo"),
  ("bar", "foobar"), ("bar", "foo and foo")
)).toDF("type", "value")

and map:

val patterns: Map[String, Pattern] = Seq(("foo", ".*foo.*"), ("bar", ".*bar.*"))
   .map{case (k, v) => (k, new Regex(v).pattern)}
   .toMap

Now I see two different options:

  • make patterns a broadcast variable referenced inside udf

    val patternsBd = sc.broadcast(patterns)
    
    val typeMatchedViaBroadcast = udf((t: String, v: String) =>
      patternsBd.value.get(t).map(m => m.matcher(v).matches))
    
    df.withColumn("match", typeMatchedViaBroadcast($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
  • passing map inside closure

    def makeTypeMatchedViaClosure(patterns: Map[String, Pattern]) = udf(
      (t: String, v: String) => patterns.get(t).map(m => m.matcher(v).matches))
    
    val typeMatchedViaClosure = makeTypeMatchedViaClosure(patterns)
    
    df.withColumn("match", typeMatchedViaClosure($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
Comments