java 如何在KafkaListener中更改失败前的DELIVERY_ATTEMPT数量?

hc2pp10m  于 2023-06-20  发布在  Java
关注(0)|答案(2)|浏览(90)

假设我有下面的listener:

@KafkaListener(topics = "${some_topic}", autoStartup = "true")
public void listenForMessage(String message) {          
    log.warn("Accepted message"); 
    if (true) {
            throw new RuntimeException("some error");
        } 

}

我尝试发送消息到主题${some_topic},并看到以下内容:

2023-06-15 10:00:03.311  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.381  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.399  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.414  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.430  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.445  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.460  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message 
2023-06-15 10:00:03.477  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.492  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.509  WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl               : Accepted message
2023-06-15 10:00:03.509 ERROR 33580 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler        : Records discarded: my-topic-0@33

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.****.listenForMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: some error; nested exception is java.lang.RuntimeException: Ololo
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2871) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2407) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$46(KafkaMessageListenerContainer.java:2419) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:185) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.FallbackBatchErrorHandler.handle(FallbackBatchErrorHandler.java:131) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:160) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.FailedBatchProcessor.fallback(FailedBatchProcessor.java:191) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.FailedBatchProcessor.handle(FailedBatchProcessor.java:159) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:150) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:182) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2417) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2095) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2074) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1393) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1290) ~[spring-kafka-2.9.7.jar:2.9.7]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.9.7.jar:2.9.7]
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
        at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]
Caused by: java.lang.RuntimeException: some error
    at ***(MyServiceImpl.java:59) ~[classes/:?]
    at ***.listenForMessage(MyServiceImpl.java:39) ~[classes/:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.26.jar:5.3.26]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.26.jar:5.3.26]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]

我们看到侦听器尝试接受消息10次,失败后#10它打印错误(并发送ACK据我所知)。我想玩这种行为
我尝试实现以下errorHandler以将重试次数减少到3:

@Bean
    public KafkaListenerErrorHandler eh() {
        return (msg, ex) -> {
            if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 3) {
                return "FAILED";
            }
            throw ex;
        };
    }

并引用它

@KafkaListener(topics = "${some_topic}", autoStartup = "true", errorHandler = "eh")
public void listenForMessage(String message) {          
    log.warn("Accepted message"); 
    if (true) {
            throw new RuntimeException("some error");
        } 

}

但是msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class)抛出异常。所以行为是一样的:10尝试接受,然后我看到例外:

...
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null

据我所知,有一种方法可以将一些异常标记为可重试的,而一些则标记为不可重试的。能否提供配置示例?

63lcw9qa

63lcw9qa1#

请参阅文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和/或
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
使用不同的FixedBackOff添加DefaultErrorHandler(默认值为9次重试,每次尝试之间没有回退)。

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

您必须启用传递尝试头,默认情况下未启用。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header

rfbsl7qr

rfbsl7qr2#

总结评论+添加示例
DELIVERY_ATTEMPT header不工作,因为你需要启用它-将容器属性deliveryAttemptHeader设置为true(请在这里查看docs.spring.io/spring-kafka/reference/html/#delivery-header)。你需要创建一个KafkaListenerContainerFactory bean:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        return factory;
    }

检查其余bean的示例(例如consumerFactory():www.example.com。然后在监听器中,您可以:https://docs.spring.io/spring-kafka/reference/html/#record-listener. And then in the listener, you do:

@KafkaListener(topics = "${some_topic}", autoStartup = "true", containerFactory = "kafkaListenerContainerFactory")

当然,也可以看看加里·罗素的答案

相关问题