Mattan Bitner Mattan Bitner - 7 months ago 87
Node.js Question

Kafka to Elasticsearch consumption with node.js

I know there are quite a few node.js modules that implement a Kafka consumer that gets msgs and writes to elastic. But I only need some of the fields from each msg and not all of them. Is there an existing solution I don't know about?


The question is asking for an example from node.js. The kafka-node module provides a very nice mechanism for getting a Consumer, which you can combine with the elasticsearch-js module:

// configure Elasticsearch client
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
  // ... connection details ...
// configure Kafka Consumer
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(
    // ... topics / partitions ...
  { autoCommit: false }

consumer.on('message', function(message) {
  if (message.some_special_field === "drop") {
    return; // skip it

  // drop fields (you can use delete message['field1'] syntax if you need
  //  to parse a more dynamic structure)
  delete message.field1;
  delete message.field2;
  delete message.field3;

    index: 'index-name',
    type: 'type-name',
    id: message.id_field, // ID will be auto generated if none/unset
    body: message
  }, function(err, res) {
    if (err) {
      throw err;

consumer.on('error', function(err) {

NOTE: Using the Index API is not a good practice when you have tons of messages being sent through because it requires that Elasticsearch create a thread per operation, which is obviously wasteful and it will eventually lead to rejected requests if the thread pool is exhausted as a result. In any bulk ingestion situation, a better solution is to look into using something like Elasticsearch Streams (or Elasticsearch Bulk Index Stream that builds on top of it), which builds on top of the official elasticsearch-js client. However, I've never used those client extensions so I don't really know how well they do or do not work, but usage would simply replace the part where I am showing the indexing happening.

I'm not convinced that the node.js approach is actually better than the Logstash one below in terms of maintenance and complexity, so I've left both here for reference.

The better approach is probably to consume Kafka from Logstash, then ship it off to Elasticsearch.

You should be able to use Logstash to do this in a straight forward way using the Kafka input and Elasticsearch output.

Each document in the Logstash pipeline is called an "event". The Kafka input assumes that it will receive JSON coming in (configurable by its codec), which will populate a single event with all of the fields from that message.

You can then drop those fields that you have no interest in handling, or conditionally the entire event.

input {
  # Receive from Kafka
  kafka {
    # ...

filter {
  if [some_special_field] == "drop" {
    drop { } # skip the entire event

  # drop specific fields
  mutate {
    remove_field => [
      "field1", "field2", ...

output {
  # send to Elasticsearch
  elasticsearch {
    # ...

Naturally, you'll need to configure the Kafka input (from the first link) and the Elasticsearch output (and the second link).