在我们的应用程序中,我们将从springamqp迁移到reactor rabbitmq,以更好地处理应用程序的React性。我们一直在读React堆项目的官方指南。但是,我不太确定,一旦重试失败,如何将消息发送到死信队列。
在前面的实现中,我们抛出spring-ampq提供的amqprejectanddontrequeueexception,这会导致消息自动转到死信队列,而不使用死信队列的专用发布服务器。如何在React堆rabbitmq中进行类似操作?我是否需要编写一个专用的发布服务器,在重试用尽后从侦听器调用它,或者有其他方法来处理它。此外,是否有DLQ和停车队列的官方项目文件。
以下是两个版本的一些代码示例:
amqp版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
var mlc = messageListenerContainerFactory
.createMessageListenerContainer(SAMPLE_QUEUE);
MessageListener messageListener = message -> {
try {
TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
logger.info("Received message for id : {}", myModel.getId());
processMessage(myModel)
.subscriberContext(ctx -> {
Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
.orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
}).block();
MDC.clear();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
}
};
mlc.setupMessageListener(messageListener);
mlc.start();
}
``` `processMessage` 正在执行业务逻辑,如果失败,我想将其移到dlq。在amqp的情况下可以正常工作。
React堆rabbitmq版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
receiver.consumeAutoAck(SAMPLE_QUEUE)
.subscribe(delivery -> {
TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
Mono.just(traceableMessage)
.map(this::extractMyModel)
.doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
.flatMap(this::processMessage)
.doFinally(signalType -> MDC.clear())
.retryWhen(Retry
.fixedDelay(1, Duration.ofMillis(10000))
.onRetryExhaustedThrow() //Move to DLQ
.doAfterRetry(retrySignal -> {
if ((retrySignal.totalRetries() + 1) >= 1) {
logger.info("Exhausted retries");
//Move to DLQ
}
}))
.subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
.subscribe();
}
);
}
有评论的两个地方之一 `//Move to DLQ` ,我在猜邮件应该发到dlq的什么地方。所以我决定我不能再处理这个了。如果有不同的发布者推送到dlq或任何特定的设置可以自动处理它。
请告诉我。
1条答案
按热度按时间dwbf0jvd1#
我找到了这个问题的答案。我使用异步侦听器和
consumeAutoAck
. 当我转到consumeManualAck
我得到一个AcknowledgebleDelivery
在那里我可以做一个nack(false)
应该把它移到死信队列。