// configure Elasticsearch client
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
// ... connection details ...
});
// configure Kafka Consumer
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client();
var consumer = new Consumer(
client,
[
// ... topics / partitions ...
],
{ autoCommit: false }
);
consumer.on('message', function(message) {
if (message.some_special_field === "drop") {
return; // skip it
}
// drop fields (you can use delete message['field1'] syntax if you need
// to parse a more dynamic structure)
delete message.field1;
delete message.field2;
delete message.field3;
esClient.index({
index: 'index-name',
type: 'type-name',
id: message.id_field, // ID will be auto generated if none/unset
body: message
}, function(err, res) {
if (err) {
throw err;
}
});
});
consumer.on('error', function(err) {
console.log(err);
});
2条答案
按热度按时间7vhp5slm1#
问题是要从node.js中获取一个示例。这个
kafka-node
模块为获取Consumer
,您可以将其与elasticsearch js模块结合使用:注意:当发送大量消息时,使用索引api不是一个好的做法,因为它要求elasticsearch为每个操作创建一个线程,这显然是浪费,如果线程池因此耗尽,它最终将导致拒绝请求。在任何批量摄取情况下,更好的解决方案是使用类似elasticsearch streams(或构建在它之上的elasticsearch批量索引流)的东西,它构建在官方elasticsearch js客户端之上。但是,我从来没有使用过这些客户机扩展,所以我不知道它们的工作情况如何,但是使用它们只会取代我显示索引发生的部分。
我不相信node.js方法在维护和复杂性方面实际上比下面的logstash方法更好,所以我把这两种方法都留在这里作为参考。
更好的方法可能是从logstash消费kafka,然后将其运送到elasticsearch。
您应该能够使用logstash直接使用kafka输入和elasticsearch输出来实现这一点。
日志存储管道中的每个文档称为“事件”。kafka输入假定它将接收传入的json(可由其编解码器配置),它将用该消息中的所有字段填充单个事件。
然后,您可以删除那些对处理不感兴趣的字段,或者有条件地删除整个事件。
当然,您需要配置kafka输入(从第一个链接)和elasticsearch输出(以及第二个链接)。
h4cxqtbf2#
前面的答案是不可扩展的生产。
您必须使用elasticsearch批量api。你可以使用这个npm包https://www.npmjs.com/package/elasticsearch-kafka-connect 它允许您将数据从kafka发送到es(es到kafka的双工连接到2019年5月仍在开发中)