medium medium - 26 days ago 11
Java Question

Kafka 2.9.2-0.8.1.1 no KeyedMessage parameter for producer.send

I am trying to send a message to Kafka in java. My project is using kafka_2.9.2-0.8.1.1.jar via the maven dependency:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>


The problem I am having is with my producer. I want to send a KeyedMessage and via the documentation here: http://kafka.apache.org/documentation.html#producerapi it states that the producer.send method should take a KeyedMessage for its parameter. When I inspect the options avaiable for the producer.send call, it only allows a ProducerData object as an acceptable paramater.

My code is set up like so:

private String topic;
private String key;
private Properties props = new Properties();
private Producer<String,String> producer;

props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", "127.0.0.1:2181");

//create the producer
producer = new Producer<String, String>(new ProducerConfig(props));

String topic = "test";
String key = "test_key";
String message = "test_msg";

KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message);
producer.send(data) <---- ERROR is here, send method not allowed param of KeyedMessage


The actual error is this states as this:

The method send(ProducerData<String,String>) in the type Producer<String,String> is not applicable for the arguments (KeyedMessage<String,String>)

Answer

I think you need to define

 props.put("partitioner.class", "example.producer.SimplePartitioner");

The wiki page says

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

Comments