我使用的是springboot2.1.1.release和springcloudgreenwich.rc2,springcloudstreambinder kafka的托管版本是2.1.0rc4。Kafka的版本是1.1.0。我设置了以下属性,因为如果出现错误,则不应使用这些消息。
spring.cloud.stream.bindings.input.group=consumer-gp-1
...
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=false
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=false
spring.cloud.stream.bindings.input.consumer.max-attempts=3
spring.cloud.stream.bindings.input.consumer.back-off-initial-interval=1000
spring.cloud.stream.bindings.input.consumer.back-off-max-interval=3000
spring.cloud.stream.bindings.input.consumer.back-off-multiplier=2.0
....
kafka主题中有20个分区,kerberos用于身份验证(不确定这是否相关)。
kafka使用者正在为它处理的每一条消息调用一个web服务,如果web服务不可用,那么我预计使用者将尝试处理该消息3次,然后再转到下一条消息。所以在我的测试中,我禁用了webservice,因此没有一条消息能够被正确处理。从日志中我可以看出这正在发生。
过了一会儿,我停止并重新启动了kafka消费程序(webservice仍然处于禁用状态)。我希望在kafka消费者重启之后,它会尝试处理第一次未成功处理的消息。从日志(我打印出每条消息及其字段)来看,在kafka消费者重启之后,我看不到这种情况发生。我认为分区可能会影响某些东西,但我检查了日志,所有20个分区都被分配给了这个使用者。
有我错过的房产吗?当我第二次重新启动消费者时,我认为预期的行为是kafka代理会再次将未成功处理的记录传递给消费者。
谢谢
1条答案
按热度按时间dly7yett1#
参数按预期工作。参见注解。