我正在建立一个Kafka消费者。我已经设置了类似于下面的恢复回调。我已启用手动提交。如何在恢复回调方法中确认消息,以便没有延迟。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
1条答案
按热度按时间xv8emn3q1#
你的问题太宽泛了。你需要更具体一些。
在这个框架中没有任何假设,如果在消费错误期间重试耗尽,您可以做什么。
我认为您应该从spring重试项目开始了解这是什么
RecoveryCallback
以及它的工作原理:如果在模板决定中止之前业务逻辑没有成功,那么客户机就有机会通过恢复回调执行一些替代处理。
一
RetryContext
有:此外,SpringKafka填充了附加属性
RetryContext
在工作中处理RecoveryCallback
: https://docs.spring.io/spring-kafka/docs/2.0.0.release/reference/html/_reference.html#_retrying_deliveries文件的内容
RetryContext
传入RecoveryCallback
将取决于侦听器的类型。上下文总是有一个属性record
它是发生故障的记录。如果您的侦听器是确认和/或消费者感知的,那么其他属性acknowledgment
和/或consumer
将提供。为了方便起见RetryingAcknowledgingMessageListenerAdapter
为这些键提供静态常量。有关更多信息,请参阅其javadocs。