AMQP Golang(amqp包)消息生产者丢失要发送的消息

cwxwcias  于 2023-03-27  发布在  Go
关注(0)|答案(1)|浏览(277)

我想从文件中发布几条消息。每行有一条消息。一切都运行得很好,但奇怪的事情发生了:如果行很长(几十KB),那么最后一条消息根本不会发布。我发布代码如下(需要mTLS):

func main() {
        flag.Parse()

        cfg := new(tls.Config)
        cfg.RootCAs = x509.NewCertPool()

        caCert, err := os.ReadFile(*caFile)
        failOnError(err, "Unable to read CA bundle")
        cfg.RootCAs.AppendCertsFromPEM(caCert)

        cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
        failOnError(err, "Unable to read certificate or key")
        cfg.Certificates = append(cfg.Certificates, cert)

        conn, err := amqp.DialTLS(*url, cfg)
        failOnError(err, "Unable to dial with TLS")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")

        messages, err := getMessages()
        failOnError(err, "Unable to get messages from file")
        log.Printf("%d messages to publish", len(messages))

        for i, m := range messages {
                log.Printf("publish message, idx: %d, len: %d", i, len(m))
                err = ch.Publish(
                        *exchange,   // exchange
                        *routingKey, // routing key
                        false,
                        false, // immediate
                        amqp.Publishing{
                                ContentType:  "text/plain",
                                Body:         []byte(m),
                                DeliveryMode: amqp.Persistent,
                        })
                failOnError(err, "Failed to publish a message")

        }
        time.Sleep(0 * time.Millisecond) // special reason explained at bottom
}

因此,在阅读文件后,切片“messages”有4个元素。现在我运行一个程序:

$ go run producer-file.go --cert client.cer --key client.key --ca ca-bundle.pem --url amqps://login:password@server:5671/virtualhost --exchange test_exchange --routing-key test_routing_key --file test_03_bundle4.txt
2023/03/21 12:01:02 4 messages to publish
2023/03/21 12:01:02 publish message, idx: 0, len: 169403
2023/03/21 12:01:02 publish message, idx: 1, len: 169148
2023/03/21 12:01:02 publish message, idx: 2, len: 169659
2023/03/21 12:01:02 publish message, idx: 3, len: 86000

到目前为止,一切看起来都很好,但队列中只有3条消息:

$ go run consumer.go --cert client.cer --key client.key --ca ca-bundle.pem --url amqps://login:password@server:5671/virtualhost -p 10 --queue test_queue
2023/03/21 12:01:02 Received a message, len: 169403
2023/03/21 12:01:02 Received a message, len: 169148
2023/03/21 12:01:02 Received a message, len: 169659
2023/03/21 12:01:02 Current speed: 3 msg/s

正如你所看到的,只有idx 0,1和2被发布,Idx = 3没有被发布。我做了一些“改进”,在一个“range messages”循环之后,我把sleep设置为20 ms:

time.Sleep(20 * time.Millisecond)

出版商:

2023/03/21 12:15:11 4 messages to publish
2023/03/21 12:15:11 publish message, idx: 0, len: 169403
2023/03/21 12:15:11 publish message, idx: 1, len: 169148
2023/03/21 12:15:11 publish message, idx: 2, len: 169659
2023/03/21 12:15:11 publish message, idx: 3, len: 86000

消费者:

2023/03/21 12:15:15  [*] Waiting for messages. To exit press CTRL+C
2023/03/21 12:15:15 Received a message, len: 169403
2023/03/21 12:15:15 Received a message, len: 169148
2023/03/21 12:15:15 Received a message, len: 169659
2023/03/21 12:15:15 Received a message, len: 86000
2023/03/21 12:15:15 Current speed: 4 msg/s

现在所有的消息都准备好消费了。我知道发布是异步的,但是-理论上-“defer conn.Close()”应该在所有缓冲区为空之前保护发布者代码不被重置或关闭连接。互联网上的每个例子都非常类似于我的生产者,所以问题是:我做了什么坏的最后一条消息是不发表在所有没有这个奇怪的时间。睡眠()?
其他测试表明,如果要发布的消息很短(~20字节),则在退出程序之前不需要休眠-消息足够小,可以在连接关闭之前及时发送到RabbitMQ服务器。

gk7wooem

gk7wooem1#

任何时候消息发布到RabbitMQ,您都应该使用发布者确认-https://www.rabbitmq.com/confirms.html#publisher-confirms
如果不这样做,就不能确定消息是否真的到达了代理并被路由到队列。
请看这个例子,它展示了如何使用confirms:
https://github.com/rabbitmq/amqp091-go/blob/main/_examples/producer/producer.go
请注意,任何网络协议都必须有某种确认机制,以确保客户端应用程序发送的数据已被正确接收。这不是严格意义上的RabbitMQ问题。

**注意:**Team RabbitMQ监控rabbitmq-users邮件列表,仅在有时回答StackOverflow上的问题。

相关问题