我使用的是spring Kafka,我有一个用java spring Boot 编写的kafka消费者。我的消费者使用批处理方式,下面给出了相关的配置bean。
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// default configs like bootstrap servers, key and value deserializers are here
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG);
factory.setBatchListener(true);
return factory;
}
我使用消息并将这些消息发送到API端点。如果API不可用或如果其余模板引发错误,我希望将整个批处理发送到DLT而不重试。我希望做的是将整个批处理发送到DLT而不重试。如果我们引发BatchListenerFailedException
,则来自批处理的特定索引号所属消息将发送到DLT。在BatchListenerFailedException
中,我们只能传递一个整数值作为索引值,而不能传递一个列表。但是我想要的是将整个批处理直接发送到DLT主题,而不需要重试。有没有办法做到这一点?
我的 Spring Kafka版本是2.8.6
编辑
我的默认错误处理程序如下所示
@Bean
public CommonErrorHandler commonErrorHandler() {
ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = new ExponentialBackOffWithMaxRetries(5);
exponentialBackOffWithMaxRetries.setInitialInterval(my val);
exponentialBackOffWithMaxRetries.setMultiplier(my val);
exponentialBackOffWithMaxRetries.setMaxInterval(my val);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, exception) -> new TopicPartition(record.topic() + "-dlt", record.partition())),
exponentialBackOffWithMaxRetries);
errorHandler.addNotRetryableExceptions(ParseException.class);
errorHandler.addNotRetryableExceptions(EventHubNonRetryableException.class);
return errorHandler;
}
在我的例子中,我使用了ExponentialBackOffWithMaxRetries
而不是固定后退。在我的例子中,我有3个场景。
1 -重试消息并将其发送到DLT(引发除BatchListenerFailedException
之外的任何其他异常)
2 -将批处理中的几条消息发送到DLT而不重试(使用BatchListenerFailedException
)
3 -将整个批次发送到DLT,不重试。
第三个是我挣扎的地方。如果我发送一些其他的异常,那么它将重试几次。(即使我使用FixedBackOff
而不是ExponentialBackOffWithMaxRetries
)
1条答案
按热度按时间ztyzrc3y1#
Throw something else other than
BatchListenerFailedException
; use aDefaultErrorHandler
with aDeadLetterPublishingRecoverer
with no retries (new FixedBackOff(0L, 0L)
).EDIT
Starting with versions 3.0.0, 2.9.3, 2.8.11, you can configure not retryable exceptions for batch errors.
https://github.com/spring-projects/spring-kafka/issues/2459
See
Note that 2.8.x is now out of OSS support. https://spring.io/projects/spring-kafka#support