使用恢复回调机制手动提交

6fe3ivhb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我正在建立一个Kafka消费者。我已经设置了类似于下面的恢复回调。我已启用手动提交。如何在恢复回调方法中确认消息,以便没有延迟。

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(conncurrency);
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(retryTemplate());
                factory.setRecoveryCallback(new RecoveryCallback<Object>() {
        @Override
        public Object recover(RetryContext context) throws Exception {
            // TODO Auto-generated method stub
            logger.debug(" In recovery callback method !!");
            return null;
        }
    });
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(initialRetryInterval);
        policy.setMultiplier(retryMultiplier);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();
       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());
       return template;
    }
}
xv8emn3q

xv8emn3q1#

你的问题太宽泛了。你需要更具体一些。
在这个框架中没有任何假设,如果在消费错误期间重试耗尽,您可以做什么。
我认为您应该从spring重试项目开始了解这是什么 RecoveryCallback 以及它的工作原理:
如果在模板决定中止之前业务逻辑没有成功,那么客户机就有机会通过恢复回调执行一些替代处理。
RetryContext 有:

/**
 * Accessor for the exception object that caused the current retry.
 * 
 * @return the last exception that caused a retry, or possibly null. It will be null
 * if this is the first attempt, but also if the enclosing policy decides not to
 * provide it (e.g. because of concerns about memory usage).
 */
Throwable getLastThrowable();

此外,SpringKafka填充了附加属性 RetryContext 在工作中处理 RecoveryCallback : https://docs.spring.io/spring-kafka/docs/2.0.0.release/reference/html/_reference.html#_retrying_deliveries
文件的内容 RetryContext 传入 RecoveryCallback 将取决于侦听器的类型。上下文总是有一个属性 record 它是发生故障的记录。如果您的侦听器是确认和/或消费者感知的,那么其他属性 acknowledgment 和/或 consumer 将提供。为了方便起见 RetryingAcknowledgingMessageListenerAdapter 为这些键提供静态常量。有关更多信息,请参阅其javadocs。

相关问题