使用dlq和elasticsearch的kafka消费者

qc6wkl3g  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我有以下群集:
Kafka->一些日志收集器->elasticsearch
我的问题是如何选择最有效的日志收集器(或者其他一些软件,它允许管理kafka和elasticsearch之间的数据流)。
我试着从logstash、fluentd和confluent的kafka elasticsearch连接器中进行选择。我面临的主要问题是,在kafka中,在写入elasticsearch端点时遇到问题,无法回滚偏移量。
例如,logstash doc说“如果启用,400和404错误将被发送到死信队列(dlq)。如果未启用dlq,则会发出日志消息,并删除事件“(https://www.elastic.co/guide/en/logstash/6.x/plugins-outputs-elasticsearch.html#_retry_policy). 如果我有这样的错误,logstash会继续从Kafka那里读取数据。错误会一次又一次地发生。虽然,我的所有数据都将存储到dlq中,但当第一个错误发生时,kafka的偏移量将被移到远离该位置的地方。我必须手动定义正确的偏移量。
所以,我的问题是:有没有kafka和elasticsearch的连接器,它允许在收到elasticsearch(400/404)的第一个错误后停止移动偏移?
提前谢谢。

pvcm50d1

pvcm50d11#

我认为问题不在于效率,而在于可靠性
我面临的主要问题是,在kafka中,在写入elasticsearch端点时遇到问题,无法回滚偏移量。
我对connect或logstash的dlq特性没有太多经验,但是重置消费组偏移量并不是不可能的。但是,如果使用者应用程序正确处理偏移提交,则不需要这样做。
如果connect向es抛出连接错误,它将重试,而不是提交偏移量。
如果错误不可恢复,那么connect将停止使用not commit偏移量。
因此,从消息批处理中获取丢失数据的唯一方法是,如果该批处理最终位于dlq中,则使用任何框架。
如果禁用了dlq,那么丢失数据的唯一方法就是从kafka过期

相关问题