kafka使用node.js搜索elasticsearch消费

ar7v8xwq  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(337)

我知道有相当多的node.js模块实现了一个kafka消费者,它获取msg并写入到elastic。但是我只需要每个msg的一些字段,而不是所有字段。是否有我不知道的现有解决方案?

7vhp5slm

7vhp5slm1#

问题是要从node.js中获取一个示例。这个 kafka-node 模块为获取 Consumer ,您可以将其与elasticsearch js模块结合使用:

// configure Elasticsearch client
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
  // ... connection details ...
});
// configure Kafka Consumer
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(
  client,
  [
    // ... topics / partitions ...
  ],
  { autoCommit: false }
);

consumer.on('message', function(message) {
  if (message.some_special_field === "drop") {
    return; // skip it
  }

  // drop fields (you can use delete message['field1'] syntax if you need
  //  to parse a more dynamic structure)
  delete message.field1;
  delete message.field2;
  delete message.field3;

  esClient.index({
    index: 'index-name',
    type: 'type-name',
    id: message.id_field, // ID will be auto generated if none/unset
    body: message
  }, function(err, res) {
    if (err) {
      throw err;
    }
  });
});

consumer.on('error', function(err) {
  console.log(err);
});

注意:当发送大量消息时,使用索引api不是一个好的做法,因为它要求elasticsearch为每个操作创建一个线程,这显然是浪费,如果线程池因此耗尽,它最终将导致拒绝请求。在任何批量摄取情况下,更好的解决方案是使用类似elasticsearch streams(或构建在它之上的elasticsearch批量索引流)的东西,它构建在官方elasticsearch js客户端之上。但是,我从来没有使用过这些客户机扩展,所以我不知道它们的工作情况如何,但是使用它们只会取代我显示索引发生的部分。
我不相信node.js方法在维护和复杂性方面实际上比下面的logstash方法更好,所以我把这两种方法都留在这里作为参考。
更好的方法可能是从logstash消费kafka,然后将其运送到elasticsearch。
您应该能够使用logstash直接使用kafka输入和elasticsearch输出来实现这一点。
日志存储管道中的每个文档称为“事件”。kafka输入假定它将接收传入的json(可由其编解码器配置),它将用该消息中的所有字段填充单个事件。
然后,您可以删除那些对处理不感兴趣的字段,或者有条件地删除整个事件。

input {
  # Receive from Kafka
  kafka {
    # ...
  }
}

filter {
  if [some_special_field] == "drop" {
    drop { } # skip the entire event
  }

  # drop specific fields
  mutate {
    remove_field => [
      "field1", "field2", ...
    ]
  }
}

output {
  # send to Elasticsearch
  elasticsearch {
    # ...
  }
}

当然,您需要配置kafka输入(从第一个链接)和elasticsearch输出(以及第二个链接)。

h4cxqtbf

h4cxqtbf2#

前面的答案是不可扩展的生产。
您必须使用elasticsearch批量api。你可以使用这个npm包https://www.npmjs.com/package/elasticsearch-kafka-connect 它允许您将数据从kafka发送到es(es到kafka的双工连接到2019年5月仍在开发中)

相关问题