这是我的代码,channel.addConfirmListener()ackCallback有些回调会丢失,消息确实发送到了rabbitmq服务器,可以正常使用,但是我在发送消息后休眠了2ms,所有的ack回调都可以收到,
我不知道这是我的代码中的错误还是rabbitmq错误
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@Log4j2
public class 异步确认发布{
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("");
connectionFactory.setPort(7005);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 开启确认发布
AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("hello", true, false, false, null);
// 异步确认发布消息 回调
channel.addConfirmListener(
(deliveryTag, multiple) -> {
log.info("消息deliveryTag=>{}, send successful", deliveryTag);
},
(deliveryTag, multiple) -> {
log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
}
);
for (int i = 0; i < 5; i++) {
String message = "Hello World!!! " + i;
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
}
}
}
控制台显示缺少一些回调
17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful
但我发完消息后休眠2ms,所有回调都能收到
示例性代码
for (int i = 0; i < 5; i++) {
String message = "Hello World!!! " + i;
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
Thread.sleep(2); // I sleep for 2ms after sending the message, and all ack callbacks can be received
}
控制台日志
17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful
我的RabbitMQ服务器版本是3.9.14(未修改任何配置。使用默认配置)、Erlang 24.3.2、
中的Maven项目依赖关系
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.18.RELEASE</version>
</dependency>
我试图阻止主线程关闭,但这似乎不是主线程关闭的原因,因为一旦创建了连接,主线程就不会自动关闭
1条答案
按热度按时间ltqd579y1#
我不知道为什么要用spring-rabbit标记它,因为您根本没有使用spring-rabbit API;您将直接使用amqp客户端。
这是按设计工作的;出于性能原因,当为真时,confirm回调具有额外的参数
multiple
;这意味着直到并包括该标签的所有标签都用单个确认来确认。https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
multiple
:这是一个布尔值。如果为false,则只确认/否定一条消息,如果为true,则确认/否定所有序列号小于或等于该序列号的消息。