Suraj Suraj - 1 month ago 6
Scala Question

How to serialise elastic4s ElasticSearch Client to run with Spark RDD?

Currently I am running Spark Mllib ALS on million of users and products and as with following code due to high shuffle to disk, collect step take more time as compare to recommendProductsForUsers step. So if I can somehow remove collect step and feed data directly from executors to elasticsearch then lot of time and computing resources will be saved.

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.elasticsearch.common.settings.ImmutableSettings


val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MYCLUSTER").build()
val client = ElasticClient.remote(settings, "11.11.11.11", 9300)
var ESMap = Map[String, List[String]]()
val topKReco = bestModel.get
// below step take 3 hours
.recommendProductsForUsers(30)
// below step takes 6 hours
.collect()
.foreach { r =>
var i = 1
var curr_user = r._1
r._2.foreach { r2 =>
item_ids(r2.product))
ESMap += i.toString -> List(r2.product.toString)
i += 1
}
client.execute {
index into "recommendations1" / "items" id curr_user fields ESMap
}.await
}


So now when I run this code without collect step I get following error :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at CatalogALS2$.main(CatalogALS2.scala:157)
at CatalogALS2.main(CatalogALS2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.sksamuel.elastic4s.ElasticClient
Serialization stack:
- object not serializable (class: com.sksamuel.elastic4s.ElasticClient, value: com.sksamuel.elastic4s.ElasticClient@e4c4af)
- field (class: CatalogALS2$$anonfun$2, name: client$1, type: class com.sksamuel.elastic4s.ElasticClient)
- object (class CatalogALS2$$anonfun$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)


So What I understand from this is, If somehow I can serialise com.sksamuel.elastic4s.ElasticClient Class then I can run this task parallelly without collecting data to the driver.
If I generalise this problem, then how can I serialise any class or function in scala to be operated on RDD ??

Answer

Found an answer for the same by using serialization like :

object ESConnection extends Serializable {

  //    Elasticsearch Client intiation
  val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MyCluster").build()
  lazy val client = ElasticClient.remote(settings, "11.11.11.11", 9300)

}

Then you can use it over RDD on executor without actually collecting data to driver as:

   val topKReco = bestModel.get
      .recommendProductsForUsers(30)
      // no collect required now
      .foreach { r =>
      var i = 1
      var curr_user = r._1

      r._2.foreach { r2 =>
      ESMap += i.toString -> List(r2.product.toString, item_ids(r2.product))
        i += 1
      }
      ESConnection.client.execute {
        index into "recommendation1" / "items" id curr_user fields ESMap
      }.await

    }