我编写了一个使用rabbitmq的java项目。我在调试模式下启动我的spring项目,在命令行上有一个断点 System.out.println(input);
在 ListenerExample.java
. 在应用程序的另一部分中,我使用以下代码行将消息发送到此队列: rabbitTemplate.convertAndSend(ManagerExample.getTopicExchangeName(), ManagerExample.getRoutingKey(), "test");
它在我的intellij窗口中点击断点,使用rabbitmq管理器我可以看到 queue1
有一条未确认的消息,没有其他消息。如果我停止程序 queue1
不包含任何消息,并且我会丢失消息(它不会在队列上移动到就绪状态)。如果我引入死信队列或改变 AcknowledgementMode
那我还是会失去信息。我怎样才能以某种方式将此消息保留在另一个队列或其他队列中 queue1
?
listenerexample.java:
package rabbitExample;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import java.io.IOException;
@Component
public class ListenerExample {
@RabbitListener(queues = ManagerExample.queueName, containerFactory = "prefetchOneRabbitListenerContainerFactory")
public static void listen(final String input, final Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
System.out.println(input);
channel.basicAck(tag, false);
}
}
managerexample.java文件:
package rabbitExample;
import lombok.Getter;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ManagerExample {
@Getter
static final String topicExchangeName = "exchange";
@Getter
static final String queueName = "queue1";
@Getter
static final String deadLetterQueueName = "dropped";
@Getter
static final String routingKey = "key";
@Bean
Queue queue() {
return QueueBuilder.durable(queueName)
.build();
}
@Bean
static TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(final Queue queue, final TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(final ConnectionFactory rabbitConnectionFactory) {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!