Hugo Reyes Hugo Reyes - 1 year ago 402
Python Question

What is the best way to remove accents with apache spark dataframes in PySpark?

I need to delete accents from characters in spanish and others languages from different datasets.

I already did a function based in the code provided in this post that removes special the accents. The problem is that the function is slow because it uses an UDF.
I'm just wondering if I can improve the performance of my function to get results in less time, because this is good for small dataframes but not for big ones.

Thanks in advance.

Here the code, you will be able to run it as it is presented:

# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf,col
import unicodedata

# Building a simple dataframe:
schema = StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("population", IntegerType(), True)])

countries = ['Venezuela', 'US@A', 'Brazil', 'Spain']
cities = ['Maracaibó', 'New York', ' São Paulo ', '~Madrid']
population = [37800000,19795791,12341418,6489162]

# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)

class Test():
def __init__(self, df):
self.df = df

def clearAccents(self, columns):
"""This function deletes accents in strings column dataFrames,
it does not eliminate main characters, but only deletes special tildes.

:param columns String or a list of column names.
# Filters all string columns in dataFrame
validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)]

# If None or [] is provided with column parameter:
if (columns == "*"): columns = validCols[:]

# Receives a string as an argument
def remove_accents(inputStr):
# first, normalize strings:
nfkdStr = unicodedata.normalize('NFKD', inputStr)
# Keep chars that has no other char combined (i.e. accents chars)
withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
return withOutAccents

function = udf(lambda x: remove_accents(x) if x != None else x, StringType())
exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns]
self.df =*exprs)

foo = Test(df)

Answer Source

One possible improvement is to build a custom Transformer, which will handle Unicode normalization, and corresponding Python wrapper. It should reduce overall overhead of passing data between JVM and Python and doesn't require any modifications in Spark itself or access to private API.

On JVM side you'll need a transformer similar to this one:


import java.text.Normalizer
import org.apache.spark.sql.types.{DataType, StringType}

class UnicodeNormalizer (override val uid: String)
  extends UnaryTransformer[String, String, UnicodeNormalizer] {

  def this() = this(Identifiable.randomUID("unicode_normalizer"))

  private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD

  val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",

  def setN(value: String): this.type = set(form, value)

  def getForm: String = $(form)

  setDefault(form -> "NFKD")

  override protected def createTransformFunc: String => String = {
    val normalizerForm = forms($(form))
    (s: String) => Normalizer.normalize(s, normalizerForm)

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType, s"Input type must be string type but got $inputType.")

  override protected def outputDataType: DataType = StringType

Corresponding build definition:

name := "unicode-normalization"

version := "1.0"

crossScalaVersions := Seq("2.10.6", "2.11.8")

organization := "net.zero323"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion

On Python side you'll need a wrapper similar to this one. If you use 2.0+ keyword_only has been moved to top pyspark module.

from import *
from import keyword_only
from import JavaTransformer

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):

    def __init__(self, form="NFKD", inputCol=None, outputCol=None):
        super(UnicodeNormalizer, self).__init__()
        self._java_obj = self._new_java_obj(
            "", self.uid)
        self.form = Param(self, "form",
            "unicode form (one of NFC, NFD, NFKC, NFKD)")
        kwargs = self.__init__._input_kwargs

    def setParams(self, form="NFKD", inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setForm(self, value):
        return self._set(form=value)

    def getForm(self):
        return self.getOrDefault(self.form)

Build Scala package:

sbt +package

include it when you start shell or submit. For example for Spark build with Scala 2.10:

bin/pyspark --jars path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar \
 --driver-class-path path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar

and you should be ready to go. All what is left is a little bit of regexp magic:

from pyspark.sql.functions import regexp_replace

normalizer = UnicodeNormalizer(form="NFKD",
    inputCol="text", outputCol="text_normalized")

df = sc.parallelize([
    (1, "Maracaibó"), (2, "New York"),
    (3, "   São Paulo   "), (4, "~Madrid")
]).toDF(["id", "text"])

    .select(regexp_replace("text_normalized", "\p{M}", ""))

## +--------------------------------------+
## |regexp_replace(text_normalized,\p{M},)|
## +--------------------------------------+
## |                             Maracaibo|
## |                              New York|
## |                          Sao Paulo   |
## |                               ~Madrid|
## +--------------------------------------+

Please note that this follows the same conventions as built in text transformers and is not null safe. You can easily correct for that by check for null in createTransformFunc.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download