kafka/rabbitmq中的每条消息确认

kiayqfof  于 2021-06-04  发布在  Kafka
关注(0)|答案(4)|浏览(474)

我们有一个工作rabbitmq.implementation,由于体积,我们计划切换到Kafka。
我有一点怀疑。
在rabbitmq中,当使用者使用来自q的消息时,消息进入另一个阶段,即未确认阶段。客户机/使用者需要一些时间来处理消息,处理成功后,它会向q发送一个确认消息,并且消息会从q中删除。如果不成功,则在定义的时间段之后,如果q没有得到确认,则消息将附加在q的末尾。这样我们就不会失去任何信息。
以我对Kafka所知甚少,我了解到,例如,如果消息100未成功处理,偏移量不会增加,但如果消息101成功处理,偏移量会增加。所以我把留言丢了。
有没有办法保证这些信息不会丢失。

eulz3vhy

eulz3vhy1#

我也面临同样的问题。如果我想用一种简单的方式,rabbitmq会记录每个
已发布但未消费
已发布、已使用和未确认的消息。
Kafka没有,所以你不能把它准备好,你必须自己去实现它。
不过也有选择,使用kmq,性能会变得不到50%,看看吧
https://softwaremill.com/kafka-with-selective-acknowledgments-performance/

63lcw9qa

63lcw9qa2#

Kafka不会从主题中删除消息,除非它到达
log.retention.bytes log.retention.hours log.retention.minutes log.retention.ms 配置。因此,如果偏移量增加,您不会丢失以前的消息,您可以简单地更改偏移量到您想要的位置。

whlutmcx

whlutmcx3#

除非轮询新邮件,否则不会增加邮件偏移量。所以你必须关心重新处理你的信息。
如果要将数据处理结果存储到kafka集群,可以使用kafka的事务特性。这样您就可以支持一次交货。您的所有更改都将被保存或不被存储。
另一种方法是使处理场景是幂等的。您将为kafka中的每条消息分配一个唯一的id。处理消息时,将id存储在数据库中。崩溃后,通过查看数据库检查消息id是否已被处理。

rdlzhqv9

rdlzhqv94#

你应该读一点关于Kafka的信息消费是如何工作的。以下是Kafka官方文档的消费者部分的链接:https://kafka.apache.org/documentation/#theconsumer
基本上,在kafka中,消息只有在经过足够的时间后才会被删除,这是使用 log.retention.hours , log.retention.minutes 以及 log.retention.ms 就像阿明说的。
在kafka中,任何数量的消费者都可以随时开始消费来自任何主题的消息,而不管其他消费者是否已经在消费来自同一主题的消息。kafka使用存储在kafka本身中的偏移量跟踪每个使用者在每个主题/分区上的位置。因此,如果您的消费者需要消费message 100,如您在问题中所述,您可以简单地“倒带”到所需的消息,然后重新开始正常消费。不管你以前是否消费过它,或者其他消费者是否阅读过该主题。
Kafka官方文件:
使用者可以故意回退到旧的偏移量并重新使用数据。这违反了队列的公共约定,但对许多消费者来说却是一个基本特性。例如,如果使用者代码有一个bug,并且在某些消息被使用后被发现,那么一旦bug被修复,使用者就可以重新使用这些消息。

相关问题