我正在尝试配置我的SpringAMQPListenerContainer
,以允许某种类型的重试流,该重试流与我正在处理的项目中以前使用的自定义rabbit客户端向后兼容。
协议工作方式如下:
1.在信道上接收消息。
1.如果处理失败,消息将被否定,并将重新发布标志设置为false
1.将具有附加/更新的标头(重试计数器)的消息副本发布到同一队列
标头用于过滤传入的消息,但这在这里并不重要。
我希望这种行为是在选择性加入的基础上发生的,这样在与旧客户机的兼容性不是问题的情况下,可以使用更标准化的Spring重试流,并且侦听器应该能够在不需要手动备份的情况下工作。
我已经实现了一个可行的解决方案,我将在下面回到这个解决方案。我正在努力的地方是,在向容器发出它应该nack当前消息的信号之后发布新消息,因为我真的找不到任何在nack之后或在下一个消息之前的好钩子。
阅读文档时,我感觉我在寻找类似于RepublishMessageRecoverer
的行为,它被用作重试拦截器的最后一步。在我的案例中,主要的区别是我需要在失败时立即重新发布,而不是作为最后的恢复步骤。我试图查看RepublishMessageRecoverer
的实现,但是许多层的间接使我很难理解重新发布是在哪里触发的,以及在此之前是否有nack。
我的工作实现如下所示。注意,我使用的是AfterThrowsAdvice
,但我认为错误处理程序也可以使用几乎相同的逻辑。
/*
MyConfig.class, configuring the container factory
*/
@Configuration
public class MyConfig {
@Bean
// NB: bean name is important, overwrites autoconfigured bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter messageConverter,
RabbitTemplate rabbitTemplate
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
// AOP
var a1 = new CustomHeaderInspectionAdvice();
var a2 = new MyThrowsAdvice(rabbitTemplate);
Advice[] adviceChain = {a1, a2};
factory.setAdviceChain(adviceChain);
return factory;
}
}
/*
MyThrowsAdvice.class, hooking into the exception flow from the listener
*/
public class MyThrowsAdvice implements ThrowsAdvice {
private static final Logger logger = LoggerFactory.getLogger(MyThrowsAdvice2.class);
private final AmqpTemplate amqpTemplate;
public MyThrowsAdvice2(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void afterThrowing(Method method, Object[] args, Object target, ListenerExecutionFailedException ex) {
var message = message(args);
var cause = ex.getCause();
// opt-in to old protocol by throwing an instance of BusinessException in business logic
if (cause instanceof BusinessException) {
/*
NB: Since we want to trigger execution after the current method fails
with an exception we need to schedule it in another thread and delay
execution until the nack has happened.
*/
new Thread(() -> {
try {
Thread.sleep(1000L);
var messageProperties = message.getMessageProperties();
var count = getCount(messageProperties);
messageProperties.setHeader("xb-count", count + 1);
var routingKey = messageProperties.getReceivedRoutingKey();
var exchange = messageProperties.getReceivedExchange();
amqpTemplate.send(exchange, routingKey, message);
logger.info("Sent!");
} catch (InterruptedException e) {
logger.error("Sleep interrupted", e);
}
}).start();
// NB: Produce the desired nack.
throw new AmqpRejectAndDontRequeueException("Business logic exception, message will be re-queued with updated headers", cause);
}
}
private static long getCount(MessageProperties messageProperties) {
try {
Long c = messageProperties.getHeader("xb-count");
return c == null ? 0 : c;
} catch (Exception e) {
return 0;
}
}
private static Message message(Object[] args) {
try {
return (Message) args[1];
} catch (Exception e) {
logger.info("Bad cast parse", e);
throw new AmqpRejectAndDontRequeueException(e);
}
}
}
现在,正如您所想象的,我对调度一个具有延迟的新线程的不确定性并不特别满意。
所以我的问题很简单,有没有办法使用ListenerContainer
提供的钩子来产生一个确定性的解决方案?
1条答案
按热度按时间vdzxcuhz1#
您当前的解决方案存在邮件丢失的风险;因为您是在延迟后在不同的线程上发布的。如果服务器在延迟期间崩溃,消息将丢失。
最好立即发布到另一个具有TTL和死信配置的队列,以便将过期消息重新发布回原始队列。
使用
RepublishMessageRecoverer
并将重试次数设置为maxattempts=1
应该可以满足您的需要。