当Kafka重新平衡分区时,多示例微服务多次处理唯一的Kafka消息

8tntrjer  于 2023-03-11  发布在  Apache
关注(0)|答案(2)|浏览(140)

在生产环境中,我们有一个带有两个示例的微服务,并且使用相同的主题,但是使用相同的组ID(和不同的clientId)。因此,如果我们有5个分区,第一个示例占用3个,另一个占用2个。
但当我们部署到生产环境中时,

  • 我们关闭了一审。
  • 然后,第一个示例启动并运行
  • 然后部署第二个示例并关闭它
  • 最后,第二个示例启动并运行。

但是这个过程使Kafka在示例关闭时重新平衡分区
问题是在此之后我们处理了两次来自Kafka分区的唯一消息(我检查Kafka分区中没有重复,并且消息是唯一的)。我认为在重新平衡和处理期间,一个示例没有实现提交一些消息,另一个示例再次处理它。注意,我们已经设置了

AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,

结果是一个独特的信息,从Kafka我们处理了两次。如何解决这个问题?
另外,当我们阅读Kafka的信息时,我们将它们存储在数据库中,然后将信息提交给Kafka。我很好奇如何解决这个问题...
如果我使用transactional consume可以解决这个问题吗?如果两个示例使用相同的消息开始事务呢?
例如,示例A在时间戳t1开始事务,id为1、2、3、4、5。示例B在时间戳t2(t1之后几毫秒)开始事务,id为4、5、6、7、8、9。
具有相同ID 4、5的消息发生了什么情况?
此外,我认为如果这是一个很好的选择,可以处理它由我使用,例如分发redis缓存和检查的id。

hkmswyz6

hkmswyz61#

您有两种选择:
1.通过使你的处理等幂来处理重复的读取。例如检查你的数据库是否已经存储了数据,如果是的话就删除记录。
1.自己处理偏移提交。在Java客户端KafkaConsumer类的Storing Offsets Outside Kafka部分有一个很好的解释。基本上,它允许您原子地处理数据库中的偏移和实际数据。注意,您需要自己处理重新平衡事件,这里有一个如何处理的示例:ConsumerRebalanceListener .
看起来您使用的是.NET客户端,因此等效方法是使用ConsumerBuilder#SetPartitionsRevokedHandler。

gojuced7

gojuced72#

如果你自己处理偏移提交,但只在你成功处理任何记录后提交,那么至少需要一次处理。特别是,如果你的消费者在进程轮询后和提交前重新平衡,那么它必须在重新平衡时寻找回最后提交的偏移。
是的,事务处理可以提供帮助,但只能在一个使用者会话中提供。如果重新平衡到一个全新的示例而不提交,则整个事务处理将再次使用。您需要在自己的数据库事务处理中合并此逻辑。
解决这个问题的方法是更频繁地提交,或者集中存储在高可用性数据库中处理过的值。但是如果你已经有了一个数据库,就没有理由添加Redis,除非你希望Redis更快。(从该数据存储查找每个事件可能导致更高的消费者延迟和网络IO,进一步增加消费者重新平衡的可能性,因此您需要增加轮询超时配置)

相关问题