Niko Niko - 2 months ago 79x
Python Question

Create a custom Transformer in PySpark ML

I am new to Spark SQL DataFrames and ML on them (PySpark).
How can I create a costume tokenizer, which for example removes stop words and uses some libraries from ? Can I extend the default one?



Can I extend the default one?

Not really. Default Tokenizer is a subclass of and, same as other transfromers and estimators from, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend directly.

import nltk

from pyspark import keyword_only  ## < 2.0 ->
from import Transformer
from import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(NLTKWordPunctTokenizer, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        kwargs = self.__init__._input_kwargs

    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            tokens = nltk.tokenize.wordpunct_tokenize(s)
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

Example usage (data from ML - Features):

sentenceDataFrame = spark.createDataFrame([
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",