Shades88 Shades88 - 3 months ago 34
Java Question

java client example for Kafka Producer, send method not accepting KeyedMessage

I am running kafka 2.9.1-0.8.2.1. I included jars provided in libs/ directory within main kafka directory. Now I am trying to run a java producer example as per what is given here https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example. Now

producer.send
method seems to be accepting this kind of argument
Seq<KeyedMessage<String, String>>
. In the example, object of KeyedMessage is not converted into anything. When I try to do the same I get incompatible types compiler error.

Here's the code

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;

public class KakfaProducer {
public static void main(String [] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class","kafka.serializer.StringEncoder");
//prop.put("partitioner.class", "example.producer.SimplePartitioner");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String,String> producer = new <String,String>Producer(producerConfig);
String topic = "test";
KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
producer.send(message);
producer.close();
}
}


And that commented code is giving me class def not found exception. I tried to look a lot on net, but it's not helping.

There are two kinds of jars in that libs/ directory. One is kafka-client and other one is just kafka and version number. Am I including wrong jar? Which one do I need to work with?

Answer

For the first problem, instead of importing scala API, import Java one. So, instead of using:

import kafka.producer.Producer;

please use:

import kafka.javaapi.producer.Producer;

SimplePartitioner code can be found below. Add it to the corresponding directory:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
    }

    public int partition(Object key, int numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
        }
       return partition;
  }
}