Kafka 将整个批处理发送到DLT而不重试

k10s72fa  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(168)

我使用的是spring Kafka,我有一个用java spring Boot 编写的kafka消费者。我的消费者使用批处理方式,下面给出了相关的配置bean。

@Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
 
    // default configs like bootstrap servers, key and value deserializers are here

    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return new DefaultKafkaConsumerFactory<>(config);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG);
    factory.setBatchListener(true);
    return factory;
  }

我使用消息并将这些消息发送到API端点。如果API不可用或如果其余模板引发错误,我希望将整个批处理发送到DLT而不重试。我希望做的是将整个批处理发送到DLT而不重试。如果我们引发BatchListenerFailedException,则来自批处理的特定索引号所属消息将发送到DLT。在BatchListenerFailedException中,我们只能传递一个整数值作为索引值,而不能传递一个列表。但是我想要的是将整个批处理直接发送到DLT主题,而不需要重试。有没有办法做到这一点?
我的 Spring Kafka版本是2.8.6

编辑

我的默认错误处理程序如下所示

@Bean
  public CommonErrorHandler commonErrorHandler() {

    ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = new ExponentialBackOffWithMaxRetries(5);
    exponentialBackOffWithMaxRetries.setInitialInterval(my val);
    exponentialBackOffWithMaxRetries.setMultiplier(my val);
    exponentialBackOffWithMaxRetries.setMaxInterval(my val);

    DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate(),
                    (record, exception) -> new TopicPartition(record.topic() + "-dlt", record.partition())),
            exponentialBackOffWithMaxRetries);
    errorHandler.addNotRetryableExceptions(ParseException.class);
    errorHandler.addNotRetryableExceptions(EventHubNonRetryableException.class);
    return errorHandler;
  }

在我的例子中,我使用了ExponentialBackOffWithMaxRetries而不是固定后退。在我的例子中,我有3个场景。
1 -重试消息并将其发送到DLT(引发除BatchListenerFailedException之外的任何其他异常)
2 -将批处理中的几条消息发送到DLT而不重试(使用BatchListenerFailedException
3 -将整个批次发送到DLT,不重试。
第三个是我挣扎的地方。如果我发送一些其他的异常,那么它将重试几次。(即使我使用FixedBackOff而不是ExponentialBackOffWithMaxRetries

ztyzrc3y

ztyzrc3y1#

Throw something else other than BatchListenerFailedException ; use a DefaultErrorHandler with a DeadLetterPublishingRecoverer with no retries ( new FixedBackOff(0L, 0L) ).

EDIT

Starting with versions 3.0.0, 2.9.3, 2.8.11, you can configure not retryable exceptions for batch errors.
https://github.com/spring-projects/spring-kafka/issues/2459
See

/**
 * Add exception types to the default list. By default, the following exceptions will
 * not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link ConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried, unless {@link #defaultFalse()} has been called.
 * @param exceptionTypes the exception types.
 * @see #removeClassification(Class)
 * @see #setClassifications(Map, boolean)
 */
@SafeVarargs
@SuppressWarnings("varargs")
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
    add(false, exceptionTypes);
    notRetryable(Arrays.stream(exceptionTypes));
}

Note that 2.8.x is now out of OSS support. https://spring.io/projects/spring-kafka#support

相关问题