kafka如何用topic/partition/offset实现一次消息传递逻辑

1hdlvixo  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(414)

我设法在方法中获得用@kafkalistener注解的topic/partition/offset,但是如何使用这些数据来实现一个只有一次的使用者逻辑呢?
我正在使用ConcurrentKafkalListenerContainerFactory,设置为Concurrent=4,并将ackmode设置为manual。我目前的方法是使用redis进行重复数据消除:我使用topic:partition as redis键,offset作为其值,然后将即将到来的offset与redis中的值进行比较,如果offset比redis中的值新(大),那么继续执行业务逻辑,否则我忽略消息。最后提交偏移量(ack.acknowledge())
但这种方法不起作用,例如,如果重新平衡发生在ack.acknowledge()完成之前,则会出现以下错误:org.apache.kafka.clients.consumer.commitfailedexception,
在重新平衡之后,原始分区被分配给另一个线程,这将导致同一消息被消耗两次。
因此,一句话,如何设计一个逻辑,使每个Kafka消息传递恰好一次?

hwazgwia

hwazgwia1#

你必须写出最后一个原子处理的偏移量,连同处理的结果,在Kafka之外。这可以是一个数据库或文件,只是不要做两次写入,使它成为一个单一的原子写入数据和偏移量。如果您的使用者崩溃,并且它或另一个示例重新启动或接管,您需要首先确保它读取与最后一个处理结果一起存储的最后一个偏移量,并在poll()获取更多消息之前将seek()定位到该位置。这是多少现有的Kafka接收器连接器可以实现eos消费今天。

ckx4rj1h

ckx4rj1h2#

Kafka还不完全支持一次。将在0.11.0.0版本中提供:https://issues.apache.org/jira/browse/kafka-4923 此版本计划于2017年6月14日发布,因此您可以等待或自行构建此复杂逻辑;-)

相关问题