user1733690 user1733690 - 4 months ago 75
Scala Question

MatchError while accessing vector column in Spark 2.0

I am trying to create a LDA model on a JSON file.


  1. Creating a spark context with the JSON file




import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()

val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")




  1. Displaying the df should show the dataframe




display(df)



  1. Tokenize the text




import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)




  1. This should be displaying the tokenized_df




display(tokenized_df)



  1. Get the stopwords




%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words
-O /tmp/stopwords



  1. Optional: copying the stopwords to the tmp folder




%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords



  1. Collecting all the stopwords




val stopwords = sc.textFile("/tmp/stopwords").collect()



  1. Filtering out the stopwords




import org.apache.spark.ml.feature.StopWordsRemover

// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")

// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)




  1. Displaying the filtered df should verify the stopwords got removed




display(filtered_df)



  1. Vectorizing the frequency of occurance of words




import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer

// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)




  1. Verify the vectorizer




vectorizer.transform(filtered_df).select("id", "text","features",
"filtered").show()


After this I am seeing an issue in fitting this vectorizer in LDA. The issue which I believe is CountVectorizer is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue

Here is the exception where map is not able to convert.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) }
display(ldaDF)





Exception

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)



There is a working sample for LDA which is not throwing any issue

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)


The only difference is in the second snippet we are having a dense matrix

Answer

This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but o.a.s.ml.linalg.VectorUDT and are mapped locally to subclasses of o.a.s.ml.linalg.Vector. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.

You could extract values and convert to old vector class

import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}

def toOld(v: Vector): OldVector = v match {
  case sv: SparseVector => OldVectors.sparse(sv.size, sv.indices, sv.values)
  case dv: DenseVector => OldVectors.dense(dv.values)
}

but it make more sense to use ML implementation of LDA if you already use ML transformers.