gsamaras gsamaras - 3 months ago 25
Python Question

Compute Cost of Kmeans

I am using this model, which is not written by me. In order to predict the centroids I had to do this:

model = cPickle.load(open("/tmp/model_centroids_128d_pkl.lopq"))
codes = d.map(lambda x: (x[0], model.predict_coarse(x[1])))


where `d.first()' yields this:

(u'3768915289',
array([ -86.00641097, -100.41325623, <128 coords in total>]))


and
codes.first()
:

(u'3768915289', (5657, 7810))


How can I computeCost() of this KMeans model?




After reading train_model.py, I am trying like this:

In [23]: from pyspark.mllib.clustering import KMeans, KMeansModel
In [24]: Cs = model.Cs # centroids
In [25]: model = KMeansModel(Cs[0]) # I am very positive this line is good
In [26]: costs = d.map(lambda x: model.computeCost(x[1]))
In [27]: costs.first()


but I get this error:

AttributeError: 'numpy.ndarray' object has no attribute 'map'


which means that Spark tries to use
map()
under the hood for
x[1]
...




which means that it expects an RDD!!! But of which form?

I am trying now with:

d = d.map(lambda x: x[1])
d.first()
array([ 7.17036494e+01, 1.07987890e+01, ...])
costs = model.computeCost(d)


and I don't get the error:

16/08/30 00:39:21 WARN TaskSetManager: Lost task 821.0 in stage 40.0 : java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:569)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-44-6223595c8b5f> in <module>()
----> 1 costs = model.computeCost(d)

/home/gs/spark/current/python/pyspark/mllib/clustering.py in computeCost(self, rdd)
140 """
141 cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
--> 142 [_convert_to_vector(c) for c in self.centers])
143 return cost
144

/home/gs/spark/current/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
128 sc = SparkContext.getOrCreate()
129 api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130 return callJavaFunc(sc, api, *args)
131
132

/home/gs/spark/current/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
121 """ Call Java Function """
122 args = [_py2java(sc, a) for a in args]
--> 123 return _java2py(sc, func(*args))
124
125

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:

/home/gs/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o25177.computeCostKmeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 821 in stage 40.0 failed 4 times, most recent failure: Lost task 821.3 in stage 40.0: java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:569)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)





Edit:

split_vecs = d.map(lambda x: np.split(x[1], 2))


seems to be a good step, since the centroids are of 64 dimensions.

model.computeCost((d.map(lambda x: x[1])).first())


gives this error:
AttributeError: 'numpy.ndarray' object has no attribute 'map'
.

Answer

Apparently from the documentation I've read, you have to:

  1. Create a model maybe by reading a previously saved model, or by fitting a new model.

  2. After obtaining that model you can use its method computeCost, which needs a well formatted RDD to output something useful.

Thus, if I assume that your variable model is a KMeansModel and the data stored in the variable d has the expected representation, then you should be able to run the following code:

model.computeCost(d)

Edit:

You should create an RDD that will contain vectors of the same dimensions as the centroids, and provide that as an input parameter to computeCost().

Comments