user1733690 user1733690 - 2 months ago 44
Scala Question

MatchError while accessing vector column in Spark 2.0

I am trying to create a LDA model on a JSON file.

  1. Creating a spark context with the JSON file

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
.config("spark.some.config.option", "config-value")

val df ="dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

  1. Displaying the df should show the dataframe


  1. Tokenize the text


// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setMinTokenLength(4) // Filter away tokens with length < 4

// Tokenize document
val tokenized_df = tokenizer.transform(df)

  1. This should be displaying the tokenized_df


  1. Get the stopwords

%sh wget
-O /tmp/stopwords

  1. Optional: copying the stopwords to the tmp folder

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

  1. Collecting all the stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect()

  1. Filtering out the stopwords


// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional

// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)

  1. Displaying the filtered df should verify the stopwords got removed


  1. Vectorizing the frequency of occurance of words

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row

// Set params for CountVectorizer
val vectorizer = new CountVectorizer()

  1. Verify the vectorizer

vectorizer.transform(filtered_df).select("id", "text","features",

After this I am seeing an issue in fitting this vectorizer in LDA. The issue which I believe is CountVectorizer is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue

Here is the exception where map is not able to convert.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = { case Row(id: String, countVector: Vector) => (id, countVector) }


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

There is a working sample for LDA which is not throwing any issue

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = { case Row(id: Long, countVector: Vector) => (id, countVector) }

val model = new LDA().setK(3).run(ldaDF.javaRDD)

The only difference is in the second snippet we are having a dense matrix


This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but and are mapped locally to subclasses of These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.

You could extract values and convert to old vector class

import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import{Vector, DenseVector, SparseVector}

def toOld(v: Vector): OldVector = v match {
  case sv: SparseVector => OldVectors.sparse(sv.size, sv.indices, sv.values)
  case dv: DenseVector => OldVectors.dense(dv.values)

but it make more sense to use ML implementation of LDA if you already use ML transformers.