Derek Jedamski Derek Jedamski - 5 months ago 69
Python Question

Pyspark - Sum over multiple sparse vectors (CountVectorizer Output)

I have a dataset with ~30k unique documents that were flagged because they have a certain keyword in them. Some of the key fields in the dataset are document title, filesize, keyword, and excerpt (50 words around keyword). Each of these ~30k unique documents have multiple keywords in them and each document has one line in the dataset per keyword (thus, each document has multiple lines). Here is a sample of what the key fields in the raw dataset might look like:

Raw Data Example

My goal is to build a model to flag documents for certain occurences (kids complaining about homework, etc.) so I need to vectorize the keyword and excerpt fields and then condense them down so we have one line per unique document.

Using only the keywords as an example of what I'm trying to do - I applied Tokenizer, StopWordsRemover, and CountVectorizer which will then output a sparse matrix with the results of the count vectorization. One sparse vector might look something like : sparseVector(158, {7: 1.0, 65: 1.0, 78: 2.0, 110: 1.0, 155: 3.0})

I want to do one of two things:

  1. Convert the sparse vectors to dense vectors, then I can groupby docID and sum up each column (one column = one token)

  2. Directly sum across the sparse vectors (grouping by docID)

To give you an idea of what I mean - on the left of the image below is the desired dense vector representation of the output of CountVectorizer and on the left is the final dataset I want.

CountVectorizer Output & Desired Dataset


I would try:

>>> from import SparseVector, DenseVector
>>> df = sc.parallelize([
...     (1, SparseVector(158, {7: 1.0, 65: 1.0, 78: 2.0, 110: 1.0, 155: 3.0})),
...     (1, SparseVector(158, {99: 100.0})),
...     (2, SparseVector(158, {1: 1.0})),
... ]).toDF(["docId", "features"])
>>> df.rdd.mapValues(lambda v: v.toArray()) \
...     .reduceByKey(lambda x, y: x + y) \
...     .mapValues(lambda x: DenseVector(x)) \
...     .toDF(["docId", "features"])