防止合流kafka在生成时丢失消息

9rygscc1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(326)

confluent kafka库(本例中为python版本)有一个接受传递回调函数的produce方法:

kafka_producer.produce(topic=topic,
                            key=key,
                            value=value,
                            on_delivery=delivery_callback)

无论消息是否成功传递,都会调用此回调:

def delivery_callback(err, msg):

如果消息失败,我在这个函数中没有任何重试逻辑,因为文档说它是异步的。
相反,每100条左右的信息,我依靠 flush() 如果有任何消息未成功生成,请告诉我:

messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
   //continue to the next batch of 100
else:
   //produce the batch again

威尔 flush() 是否有任何未能生成的消息(报告为错误 delivery_callback )
换句话说,我能确定吗 flush() 如果任何消息失败,不会返回零?

pbwdgjma

pbwdgjma1#

确认了以下结果:
打电话 .flush() 即使消息未能生成,也绝对可以返回零。此方法似乎要等到所有消息的所有传递回调都已完成(回调可以简单地报告消息未能传递)。
从我们的Angular 来看,整件事令人惊讶地尴尬。如果不能承受消息丢失的代价,则需要检测何时传递回调失败,并实现某种形式的重试逻辑来覆盖失败的消息。

相关问题