Alban Alban - 5 months ago 20
Java Question

Kafka: java client failed to send messages after x tries

I try to send messages into Kafka from a Java application.

All I can get is "Failed to send messages after 2 tries":

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 2 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at test.Main.main(Main.java:26)


Kafka is running on a remote machine so I added to its
server.properties
(let's say that Kafka server's IP address is 192.168.0.1):

host.name=192.168.0.1
advertised.host.name=192.168.0.1
advertised.port=9092


The running Kafka is
kafka_2.11-0.8.2.1
so I used (I guess) the appropriate Java-client version:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>


The Java code:

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("metadata.broker.list", "192.168.0.1:9092");
properties.put("serializer.class", "test.StringEncoder");
properties.put("key.serializer.class", "test.StringEncoder");
properties.put("message.send.max.retries", "2");

Producer<String, String> kafkaProducer = new Producer<String, String>(new ProducerConfig(properties));

kafkaProducer.send(new KeyedMessage<String, String>(
"LOG",
"Yo! " + new Date().toString()
));

kafkaProducer.close();
}


The
LOG
topic is already created. I am able to send messages into Kafka from the same machine that executes Java code using (and it works):

bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic LOG


When Java code fails, nothing is logged by Kafka nor Zookeeper.

Is there a specific parameter I missed?

Answer

Apache kafka has a new producer client that is better:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

Example: https://github.com/CameronGregory/kafka/blob/master/TestProducer.java

Aparently your config is ok. Is "test.StringEncoder" your custom class? try yo use "kafka.serializer.StringEncoder" instead