Dave Dave - 4 months ago 34
Scala Question

Cannot Instanciate ZkClient due to invalid ZKStringSerializer reference

I am migrating a Play v 2.3.4 app to Play v 2.5.4. Along the way I had to also upgrade to Scala 2.11.8 and kafka 9.0+ to support the updated Play version.

Most of the issues I have worked out but I cannot figure out a Kafka issue with some code that manages Kafka topics though AdminUtils. The troubles are all centered around kafka.utils.ZkStringSerialzier.

I am using org.I0Itec.zkclient package to instances ZkClient object that is passed in the construction of ZkUtils object but it fails because it cannot resolve my ZkStringSerializer.

Related code is:

import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false

val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)

AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}


The above code results in the error that ZKStringSerializer is inaccessible from his place.

I found several related post to creating topics (mostly in Java and before Kafka 9.0)
Creating a topic for Apache Kafka 0.9 Using Java
How create Kafka ZKStringSerializer in Java?
How Can we create a topic in Kafka from the IDE using API
And Finally
Creating a Kafka topic results in no leader

Based on these I updated by code as follows:

import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer$
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false

val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)

AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}


And then I just get unable to resolve symbol ZkStringSerialzer$ errors.

I tried both with the org.I0Itec.zkclient.serialize.ZkSerializer object as well and it did not make a difference.

So my Question is actually two fold:
1. What is the significance of the '$' character for the Import and Declarations statements in scala. I have used it in string interpolation ( e/g/ s"var value is $var")to reference variables but this seems different.
2. What is wrong with my code. Is it the way I am importing, declaring, something else?

I am new to scala and Play but I am feeling like quite and idiot at the moment so any advice / help is appreciated

~Dave

P.S.
In case it helps relevant bits from project files

build.sbt:

lazy val `api` = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
jdbc,
cache,
ws,
specs2 % Test
)


plugins.sbt:

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4")

addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0")


build.properties:

sbt.version=0.13.5

Answer

After fighting this issue over the weekend I gave up on the ZKClient package that had been used previously and simple used Kafka directly which was actually much cleaner that trying to use the I0Itech ZKClient.

New implementation goes like this:

import java.util.Properties
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils

class Topic {
  def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
    if (ListKafkaTopics(zookeeperHosts).contains(topic) ) {
      return false
    }
    val zkUtils = ZkUtils.apply(zookeeperHosts, sessionTimeoutMs, connectionTimeoutMs, false)
    AdminUtils.createTopic( zkUtils, topic, partitionSize, replicationCount, new Properties())
    zkUtils.close()
    true
  }
}

End the end removed a dependency and make cleaner code so a double win I suppose.

~Dave