quux00 quux00 - 2 years ago 181
Java Question

How create Kafka ZKStringSerializer in Java?

In searching for how to create a Kafka topic through the API, I found this example in Scala:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer)

// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName,
numPartitions, replicationFactor, topicConfig)

Source: http://stackoverflow.com/a/23360100/871012

The last arg
is apparently a Scala object. It is not clear to me how to make this example work in Java.

This post How to create a scala object in clojure asks the same question in Clojure and the answer was:


which in Java would (I think) translate to:


But when I try that (or any number of other variations) none of them compile.

The compilation error is:

KafkaTopicCreator.java:[16,18] cannot find symbol
symbol: variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator

I am using kafka_2.9.2- and Java 8.

Answer Source

For java try the following,

First import below statement

import kafka.utils.ZKStringSerializer$;

Create object for ZkClient in the following way,

String zkHosts = ""; //If more than one zookeeper then ","
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9

import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = ""; // If multiple zookeeper then -> String zookeeperHosts = ",";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
        } finally {
            if (zkClient != null) {
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download