重新解读Kafka信息的可能原因

fjaof16o  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(553)

昨天我从日志中发现,Kafka在Kafka小组协调人发起小组重新平衡后,正在重新整理一些信息。这些消息已在两天前被使用(从日志中确认)。
日志中报告了另外两个重新平衡,但他们不再重新汇总消息。那么,为什么第一次重新切换会导致重新消费的消息?有什么问题?
我正在使用golang kafka客户机。这是密码

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest

而且我们是在声明消息之前处理消息的,所以我们似乎对Kafka使用了至少发送一次的策略。我们在一台机器上有三个代理,在另一台机器上只有一个用户线程(go routine)。
有什么解释吗?我想这些信息一定是被提交的,因为它们是两天前被消费的,或者Kafka为什么要在没有提交的情况下保留超过两天的偏移量呢?
使用代码示例:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil
}

补充:
重新平衡发生在应用程序重启之后。另外还有两次重启没有成功
Kafka的形象
log.retention.check.interval.ms=300000
log.retention.hours=168小时
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=真
auto.create.topics.enable=假

hsgswve4

hsgswve41#

通过阅读golang saram客户端和kafka服务器的源代码,我最终找到了以下原因
消费组偏移保留时间是24小时,这是kafka的默认设置,而日志保留时间是我们明确设置的7天。
我的服务器应用是在测试环境中运行的,很少有人可以访问,这意味着Kafka制作者可能会产生很少的消息,然后消费者组会有很少的消息要消费,因此消费者可能会很长时间不提交任何偏移量。
由于偏移量配置的原因,当消费偏移量没有更新超过24小时时,kafka代理/协调器将从分区中删除消费偏移量。下次saram从kafka代理那里查询偏移量时,客户机当然什么也得不到。注意,我们使用sarama.offsetoldest作为初始值,然后sarama客户端将使用kafka代理保存的消息的开头的消息,这将导致消息重新消耗,这很可能发生,因为日志保留时间是7天

相关问题