RabbitMQ通道.addConfirmListener(),接口ackCallback是否缺少某些回调?

0qx6xfy6  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(145)

这是我的代码,channel.addConfirmListener()ackCallback有些回调会丢失,消息确实发送到了rabbitmq服务器,可以正常使用,但是我在发送消息后休眠了2ms,所有的ack回调都可以收到,
我不知道这是我的代码中的错误还是rabbitmq错误

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Log4j2
public class 异步确认发布{
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(7005);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 开启确认发布
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();

        channel.queueDeclare("hello", true, false, false, null);
        //  异步确认发布消息 回调
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, send successful", deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
                }
        );
        for (int i = 0; i < 5; i++) {
            String message = "Hello World!!!   " + i;
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

控制台显示缺少一些回调

17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

但我发完消息后休眠2ms,所有回调都能收到
示例性代码

for (int i = 0; i < 5; i++) {
    String message = "Hello World!!!   " + i;
    channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
    Thread.sleep(2);  // I sleep for 2ms after sending the message, and all ack callbacks can be received
}

控制台日志

17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

我的RabbitMQ服务器版本是3.9.14(未修改任何配置。使用默认配置)、Erlang 24.3.2、
中的Maven项目依赖关系

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.18.RELEASE</version>
</dependency>

我试图阻止主线程关闭,但这似乎不是主线程关闭的原因,因为一旦创建了连接,主线程就不会自动关闭

ltqd579y

ltqd579y1#

我不知道为什么要用spring-rabbit标记它,因为您根本没有使用spring-rabbit API;您将直接使用amqp客户端。
这是按设计工作的;出于性能原因,当为真时,confirm回调具有额外的参数multiple;这意味着直到并包括该标签的所有标签都用单个确认来确认。
https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
multiple:这是一个布尔值。如果为false,则只确认/否定一条消息,如果为true,则确认/否定所有序列号小于或等于该序列号的消息。

相关问题