当我停止程序时,如何防止java应用程序丢弃未确认的rabbitmq消息?

e5nqia27  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(505)

我编写了一个使用rabbitmq的java项目。我在调试模式下启动我的spring项目,在命令行上有一个断点 System.out.println(input);ListenerExample.java . 在应用程序的另一部分中,我使用以下代码行将消息发送到此队列: rabbitTemplate.convertAndSend(ManagerExample.getTopicExchangeName(), ManagerExample.getRoutingKey(), "test"); 它在我的intellij窗口中点击断点,使用rabbitmq管理器我可以看到 queue1 有一条未确认的消息,没有其他消息。如果我停止程序 queue1 不包含任何消息,并且我会丢失消息(它不会在队列上移动到就绪状态)。如果我引入死信队列或改变 AcknowledgementMode 那我还是会失去信息。我怎样才能以某种方式将此消息保留在另一个队列或其他队列中 queue1 ?
listenerexample.java:

package rabbitExample;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

import java.io.IOException;

@Component
public class ListenerExample {
  @RabbitListener(queues = ManagerExample.queueName, containerFactory = "prefetchOneRabbitListenerContainerFactory")
  public static void listen(final String input, final Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
    System.out.println(input);
    channel.basicAck(tag, false);
  }
}

managerexample.java文件:

package rabbitExample;

import lombok.Getter;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ManagerExample {
  @Getter
  static final String topicExchangeName = "exchange";
  @Getter
  static final String queueName = "queue1";
  @Getter
  static final String deadLetterQueueName = "dropped";
  @Getter
  static final String routingKey = "key";

  @Bean
  Queue queue() {
    return QueueBuilder.durable(queueName)
      .build();
  }

  @Bean
  static TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(final Queue queue, final TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  }

  @Bean
  public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(final ConnectionFactory rabbitConnectionFactory) {
    final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(1);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题