我正在用一个标准的循环处理Kafka的信息:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
}
}
如果我的Kafka Consumer在处理记录时超时了怎么办?我指的是由属性session.timeout.ms
控制的超时当这种情况发生时,我的Consumer应该停止处理记录,因为它会丢失它的分区,并且它处理的记录可能已经被另一个Consumer处理了。如果原始Consumer将一些处理结果写入数据库中,它可能会覆盖在我的原始消费者超时之后获得分区的“新”消费者产生的记录。
我知道ConsumerRebalanceListener,但据我所知,它的方法onPartitionsLost只有在我从消费者调用poll方法之后才会被调用,因此这无助于停止对我从上一次轮询中接收到的一批记录的处理循环。
我希望heartbeat线程可以通知我它无法联系代理,并且我们在消费者中有一个会话超时,但似乎没有类似的事情...我是否遗漏了什么?
1条答案
按热度按时间ddhy6vgd1#
添加此作为答案,因为在评论中太长。
Kafka有几种方法可以用来处理信息
你说你想把Kafka当作一次语义来使用(顺便说一下,这是最不常用的使用Kafka的方式),而且生产者需要很好地发挥,因为默认情况下Kafka可以不止一次地生产相同的信息。
构建使用“至少一次”机制的服务要常见得多,这样你就能收到多次(或处理)相同的邮件,但您需要有一种方法来消除重复数据(这与httpAPI的幂等性背后的思想是相同的)。您需要在消息中有一些唯一的内容,并注册该 id 已经被处理过。如果有效负载中没有可用于消除重复数据的内容,则可以在消息中添加一个头并使用它。
这在您必须重置偏移量的场景中也很有用,这样服务就可以在不中断的情况下遍历旧消息。
我建议你去谷歌一下如何实现上面的细节,这里有一篇来自confluent的关于开发exactly once semantics Improved Robustness and Usability of Exactly-Once Semantics in Apache Kafka的博客文章,以及解释不同语义的Kafka文档。
关于
ConsumerRebalanceListener
,如果你遵循在消费者中使用幂等性的解决方案,你不需要做任何事情。当应用崩溃时,也会发生重新平衡,在这种情况下,服务可能已经处理了一些记录,但还没有将它们提交给Kafka。我给予每一个开始写Kafka的人一个小提示。Kafka从外表看起来很简单,但它是一个复杂的技术。不要在生产中使用它,直到你知道它是如何工作的,包括做了大量的负面测试(除非你可以接受丢失数据)。