不同的重试策略不同的消费者在Kafka

cclgggtu  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(458)

我们要处理的场景是,在同一个应用程序中,不同的使用者需要不同的重试策略。
请参阅下图(简要架构图):

这个 main_consumer 消耗来自的有效负载 main_topic 并尝试将其发送到api。如果api处理失败,我们将把这个失败的负载写入另一个名为 error_topic . 有一个不同的消费者 (error_consumer) 从中消耗 error_topic 并通过3次重试尝试再次将有效负载发送到api。如果仍然是失败,那么 error_consumer 将有效载荷推入 DLQ .
我们面临的问题:
我们需要 main_consumer 失败时不重试 error_consumer 失败时重试3次。我们拿走了 maxAttempts 为1 main_consumer 以及 maxAttempts 作为3 error_consumer . 但是有了这个配置, main_consumer 正在重试3次 error_consumer 一次。它的工作原理与我们预期的完全相反。
p、 s:我们试着换了 maxAttempts 对于两个消费者(这是不合逻辑的)都是徒劳的。
下面是我们正在使用的spring云流应用程序配置:
我们正在使用以下两个配置文件运行应用程序。
应用程序-main.yml

spring:
  cloud:
    stream:
      kafka:
        bindings:
          main-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        main-consumer-channel:
          destination: main_topic
          consumer:
            maxAttempts: 1
            backOffInitialInterval: 5000
            backOffMultiplier: 2

application-error-retry.yml文件

spring:
  cloud:
    stream:
      kafka:
        bindings:
          error-consumer-channel:
            consumer:
              autoCommitOffset: false
      bindings:
        error-consumer-channel:
          destination: error_topic
          consumer:
             maxAttempts: 3
             backOffInitialInterval: 5000
             backOffMultiplier: 2
yx2lnoni

yx2lnoni1#

根据Spring文件-https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxattempts配置应处于
“spring.cloud.stream.bindings..consumer。”
在您的配置中,它看起来像在
“spring.cloud.stream.kafka.bindings..consumer。”
maxattempts似乎不是Kafka绑定道具的有效配置-https://github.com/spring-cloud/spring-cloud-stream-binder-kafka

n6lpvg4x

n6lpvg4x2#

这对我来说很好。。。

@SpringBootApplication
@EnableBinding(Inputs.class)
public class So57522645Application {

    public static void main(String[] args) {
        SpringApplication.run(So57522645Application.class, args);
    }

    @StreamListener("input1")
    public void listen1(String in) {
        System.out.println("main: " + in);
        throw new RuntimeException("fail");
    }

    @StreamListener("input2")
    public void listen2(String in) {
        System.out.println("error: " + in);
        throw new RuntimeException("fail");
    }

    @StreamListener("input3")
    public void listen3(String in) {
        System.out.println("final: " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> template.send("main", "test".getBytes());
    }

}

interface Inputs {

    @Input
    MessageChannel input1();

    @Input
    MessageChannel input2();

    @Input
    MessageChannel input3();

}
spring:
  cloud:
    stream:
      bindings:
        input1:
          consumer:
            max-attempts: 1
          destination: main
          group: grp1
        input2:
          consumer:
            max-attempts: 3
          destination: error.main.grp1
          group: grp2
        input3:
          destination: error.error.main.grp1.grp2
          group: grp3
      kafka:
        bindings:
          input1:
            consumer:
              enable-dlq: true
          input2:
            consumer:
              enable-dlq: true

main: test
error: test
error: test
error: test
final: test

相关问题