我们要处理的场景是,在同一个应用程序中,不同的使用者需要不同的重试策略。
请参阅下图(简要架构图):
这个 main_consumer
消耗来自的有效负载 main_topic
并尝试将其发送到api。如果api处理失败,我们将把这个失败的负载写入另一个名为 error_topic
. 有一个不同的消费者 (error_consumer)
从中消耗 error_topic
并通过3次重试尝试再次将有效负载发送到api。如果仍然是失败,那么 error_consumer
将有效载荷推入 DLQ
.
我们面临的问题:
我们需要 main_consumer
失败时不重试 error_consumer
失败时重试3次。我们拿走了 maxAttempts
为1 main_consumer
以及 maxAttempts
作为3 error_consumer
. 但是有了这个配置, main_consumer
正在重试3次 error_consumer
一次。它的工作原理与我们预期的完全相反。
p、 s:我们试着换了 maxAttempts
对于两个消费者(这是不合逻辑的)都是徒劳的。
下面是我们正在使用的spring云流应用程序配置:
我们正在使用以下两个配置文件运行应用程序。
应用程序-main.yml
spring:
cloud:
stream:
kafka:
bindings:
main-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
main-consumer-channel:
destination: main_topic
consumer:
maxAttempts: 1
backOffInitialInterval: 5000
backOffMultiplier: 2
application-error-retry.yml文件
spring:
cloud:
stream:
kafka:
bindings:
error-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
error-consumer-channel:
destination: error_topic
consumer:
maxAttempts: 3
backOffInitialInterval: 5000
backOffMultiplier: 2
2条答案
按热度按时间yx2lnoni1#
根据Spring文件-https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxattempts配置应处于
“spring.cloud.stream.bindings..consumer。”
在您的配置中,它看起来像在
“spring.cloud.stream.kafka.bindings..consumer。”
maxattempts似乎不是Kafka绑定道具的有效配置-https://github.com/spring-cloud/spring-cloud-stream-binder-kafka
n6lpvg4x2#
这对我来说很好。。。
和