C.Y. Wu C.Y. Wu - 1 year ago 160
Scala Question

Error when extracting features(spark)

I encountered some problems when I tried to extract features from raw data.

Here is my data:


and here is my code:

val rawData = sc.textFile("data/myData.data")
val lines = rawData.map(_.split(","))
val categoriesMap = lines.map(fields => fields(1)).distinct.collect.zipWithIndex.toMap

Here is the error info:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 3, localhost): java.lang.ArrayIndexOutOfBoundsException: 1

I want to extract the second column as the categorical feature, but it seems that it cannot read the column and leads to ArrayIndexOutOfBoundsException.
I tried many times but still cannot solve the problem.

val categoriesMap1 = lines.map(fields => fields(1)).distinct.collect.zipWithIndex.toMap
val labelpointRDD = lines.map { fields =>
val categoryFeaturesArray1 = Array.ofDim[Double](categoriesMap1.size)
val categoryIdx1 = categoriesMap1(fields(1))
categoryFeaturesArray1(categoryIdx1) = 1 }

Answer Source

Your code works for the example you supplied - which means it's fine for "valid" rows - but your input probably contains some invalid rows - in this case, rows with no commas.

You can either clean your data or improve the code to handle these rows more gracefully, for example using some default value for bad rows:

val rawData = sc.parallelize(Seq(

val lines = rawData.map(_.split(","))
val categoriesMap = lines.map {
  case Array(_, s, _*) => s // for arrays with 2 or more items - use 2nd
  case _ => "UNKNOWN"       // default

println(categoriesMap) // prints Map(UNKNOWN -> 0, Private -> 1)

UPDATE: per updated question - assuming these rows are indeed invalid, you can just skip them entirely, both when extracting the categories map and when mapping to labeled points:

val secondColumn: RDD[String] = lines.collect {
  case Array(_, s, _*) => s // for arrays with 2 or more items - use 2nd
  // shorter arrays (bad records) will be ~~filtered out~~

val categoriesMap = secondColumn.distinct().collect().zipWithIndex.toMap
val labelpointRDD = secondColumn.map { field =>
  val categoryFeaturesArray1 = Array.ofDim[Double](categoriesMap.size)
  val categoryIdx1 = categoriesMap(field)
  categoryFeaturesArray1(categoryIdx1) = 1
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download