如何使用reactor rabbitmq处理死信队列

xdnvmnnf  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(498)

在我们的应用程序中,我们将从springamqp迁移到reactor rabbitmq,以更好地处理应用程序的React性。我们一直在读React堆项目的官方指南。但是,我不太确定,一旦重试失败,如何将消息发送到死信队列。
在前面的实现中,我们抛出spring-ampq提供的amqprejectanddontrequeueexception,这会导致消息自动转到死信队列,而不使用死信队列的专用发布服务器。如何在React堆rabbitmq中进行类似操作?我是否需要编写一个专用的发布服务器,在重试用尽后从侦听器调用它,或者有其他方法来处理它。此外,是否有DLQ和停车队列的官方项目文件。
以下是两个版本的一些代码示例:
amqp版本:

@AllArgsConstructor
public class SampleListener {
    private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
    private final MessageListenerContainerFactory messageListenerContainerFactory;
    private final Jackson2JsonMessageConverter converter;

    @PostConstruct
    public void subscribe() {
        var mlc = messageListenerContainerFactory
                .createMessageListenerContainer(SAMPLE_QUEUE);
        MessageListener messageListener = message -> {
            try {
                TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
                ObjectMapper mapper = new ObjectMapper();
                mapper.registerModule(new JavaTimeModule());
                MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
                MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
                logger.info("Received message for id : {}", myModel.getId());
                processMessage(myModel)
                        .subscriberContext(ctx -> {
                            Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
                            return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
                                    .orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
                        }).block();
                MDC.clear();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
            }
        };
        mlc.setupMessageListener(messageListener);
        mlc.start();
    }
``` `processMessage` 正在执行业务逻辑,如果失败,我想将其移到dlq。在amqp的情况下可以正常工作。
React堆rabbitmq版本:

@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;

@PostConstruct
public void subscribe() {
    receiver.consumeAutoAck(SAMPLE_QUEUE)
            .subscribe(delivery -> {
                        TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
                        Mono.just(traceableMessage)
                                .map(this::extractMyModel)
                                .doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
                                .flatMap(this::processMessage)
                                .doFinally(signalType -> MDC.clear())
                                .retryWhen(Retry
                                        .fixedDelay(1, Duration.ofMillis(10000))
                                        .onRetryExhaustedThrow() //Move to DLQ
                                        .doAfterRetry(retrySignal -> {
                                            if ((retrySignal.totalRetries() + 1) >= 1) {
                                                logger.info("Exhausted retries");
                                                //Move to DLQ
                                            }
                                        }))
                                .subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
                                .subscribe();
                    }
            );

}
有评论的两个地方之一 `//Move to DLQ` ,我在猜邮件应该发到dlq的什么地方。所以我决定我不能再处理这个了。如果有不同的发布者推送到dlq或任何特定的设置可以自动处理它。
请告诉我。
dwbf0jvd

dwbf0jvd1#

我找到了这个问题的答案。我使用异步侦听器和 consumeAutoAck . 当我转到 consumeManualAck 我得到一个 AcknowledgebleDelivery 在那里我可以做一个 nack(false) 应该把它移到死信队列。

相关问题