João_testeSW João_testeSW - 1 year ago 90
Scala Question

Get Cluster_ID and the rest of table using Spark MLlib KMeans

I have this dataset (I'm putting some a few rows):


And I'm trying to assign all this rows to 4 clusters using K-Means. For that I'm using the code that I see in this post: Spark MLLib Kmeans from dataframe, and back again

val data = sc.textFile("/user/cloudera/TESTE1")
val idPointRDD = => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
val clusters = KMeans.train(, 4, 20)
val clustersRDD = clusters.predict(
val idClusterRDD =
val idCluster = idClusterRDD.toDF("purchase","id","product","cluster")

I'm getting this outputs:

scala> import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("/user/cloudera/TESTE")
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/TESTE MapPartitionsRDD[7] at textFile at <console>:29

scala> val idPointRDD = => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
idPointRDD: org.apache.spark.rdd.RDD[(Char, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[8] at map at <console>:31

But when I run it I'm getting the following error:

java.lang.UnsupportedOperationException: Schema for type Char is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)

How can I solve this problem?

Many thanks!

Answer Source

Here is the thing. You are actually reading a CSV of values into an RDD of String and not converting it properly to numeric values. Instead since a string is a collection of character when you call upon s(0) per example this actually works converts the Char value to an integer or a double but it's not what you are actually looking for.

You need to split your val data : RDD[String]

 val data : RDD[String] = ??? 
 val idPointRDD = {
     s => 
      s.split(",") match { 
      case Array(x,y,z) => Vectors.dense(x.toDouble, Integer.parseInt(y).toDouble,Integer.parseInt(z).toDouble)

This should work for you !

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download