在其中一个示例中,当使用者服务重新启动时,它会导致重新处理发送给kafka的所有记录。
Kafka Broker: 0.10.0.1
Kafka producer Service: Springboot version 1.4.3.Release
Kafka Consumer Springboot Service: Springboot version 2.2.0.Release
现在为了研究这个问题,我想在dev/local环境中重新创建这个场景,这是不可能的!!!
可能的原因是什么?
如何检查,如果记录一次是从消费端处理的,我们发送时是否提交 Acknowledgement.acknowledge();
消费者-财产
Enable Auto commit = false;
Auto offset Reset = earliest;
max poll records = 1;
max poll interval ms config = I am calculating the value of this parameter at runtime from the formula ==>> (number_of_retries * x * 2) <= INTEGER.MaxValue
重试策略-简单
number of retries = 3;
interval between retries = x (millis)
我在运行时通过bean在用户端创建主题 NewTopic(topic_name, 1, (**short**)1)
有2个kafka集群和1个zookeeper示例正在运行。
任何帮助都将不胜感激
1条答案
按热度按时间inkz8wg91#
那个经纪人很老了;如果使用者在24小时内未收到任何记录,则会删除偏移量,重新启动使用者将导致其重新处理所有记录。
对于较新的经纪人,它被改为7天,消费者必须停止7天的抵消被删除。
Spring Boot1.4.x(甚至1.5.x,2.0.x)不再受支持;当前版本是2.3.1。
您应该升级到一个更新的代理和一个更新的spring引导版本。