我想从文件中发布几条消息。每行有一条消息。一切都运行得很好,但奇怪的事情发生了:如果行很长(几十KB),那么最后一条消息根本不会发布。我发布代码如下(需要mTLS):
func main() {
flag.Parse()
cfg := new(tls.Config)
cfg.RootCAs = x509.NewCertPool()
caCert, err := os.ReadFile(*caFile)
failOnError(err, "Unable to read CA bundle")
cfg.RootCAs.AppendCertsFromPEM(caCert)
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
failOnError(err, "Unable to read certificate or key")
cfg.Certificates = append(cfg.Certificates, cert)
conn, err := amqp.DialTLS(*url, cfg)
failOnError(err, "Unable to dial with TLS")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
messages, err := getMessages()
failOnError(err, "Unable to get messages from file")
log.Printf("%d messages to publish", len(messages))
for i, m := range messages {
log.Printf("publish message, idx: %d, len: %d", i, len(m))
err = ch.Publish(
*exchange, // exchange
*routingKey, // routing key
false,
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(m),
DeliveryMode: amqp.Persistent,
})
failOnError(err, "Failed to publish a message")
}
time.Sleep(0 * time.Millisecond) // special reason explained at bottom
}
因此,在阅读文件后,切片“messages”有4个元素。现在我运行一个程序:
$ go run producer-file.go --cert client.cer --key client.key --ca ca-bundle.pem --url amqps://login:password@server:5671/virtualhost --exchange test_exchange --routing-key test_routing_key --file test_03_bundle4.txt
2023/03/21 12:01:02 4 messages to publish
2023/03/21 12:01:02 publish message, idx: 0, len: 169403
2023/03/21 12:01:02 publish message, idx: 1, len: 169148
2023/03/21 12:01:02 publish message, idx: 2, len: 169659
2023/03/21 12:01:02 publish message, idx: 3, len: 86000
到目前为止,一切看起来都很好,但队列中只有3条消息:
$ go run consumer.go --cert client.cer --key client.key --ca ca-bundle.pem --url amqps://login:password@server:5671/virtualhost -p 10 --queue test_queue
2023/03/21 12:01:02 Received a message, len: 169403
2023/03/21 12:01:02 Received a message, len: 169148
2023/03/21 12:01:02 Received a message, len: 169659
2023/03/21 12:01:02 Current speed: 3 msg/s
正如你所看到的,只有idx 0,1和2被发布,Idx = 3没有被发布。我做了一些“改进”,在一个“range messages”循环之后,我把sleep设置为20 ms:
time.Sleep(20 * time.Millisecond)
出版商:
2023/03/21 12:15:11 4 messages to publish
2023/03/21 12:15:11 publish message, idx: 0, len: 169403
2023/03/21 12:15:11 publish message, idx: 1, len: 169148
2023/03/21 12:15:11 publish message, idx: 2, len: 169659
2023/03/21 12:15:11 publish message, idx: 3, len: 86000
消费者:
2023/03/21 12:15:15 [*] Waiting for messages. To exit press CTRL+C
2023/03/21 12:15:15 Received a message, len: 169403
2023/03/21 12:15:15 Received a message, len: 169148
2023/03/21 12:15:15 Received a message, len: 169659
2023/03/21 12:15:15 Received a message, len: 86000
2023/03/21 12:15:15 Current speed: 4 msg/s
现在所有的消息都准备好消费了。我知道发布是异步的,但是-理论上-“defer conn.Close()”应该在所有缓冲区为空之前保护发布者代码不被重置或关闭连接。互联网上的每个例子都非常类似于我的生产者,所以问题是:我做了什么坏的最后一条消息是不发表在所有没有这个奇怪的时间。睡眠()?
其他测试表明,如果要发布的消息很短(~20字节),则在退出程序之前不需要休眠-消息足够小,可以在连接关闭之前及时发送到RabbitMQ服务器。
1条答案
按热度按时间gk7wooem1#
任何时候消息发布到RabbitMQ,您都应该使用发布者确认-https://www.rabbitmq.com/confirms.html#publisher-confirms
如果不这样做,就不能确定消息是否真的到达了代理并被路由到队列。
请看这个例子,它展示了如何使用confirms:
https://github.com/rabbitmq/amqp091-go/blob/main/_examples/producer/producer.go
请注意,任何网络协议都必须有某种确认机制,以确保客户端应用程序发送的数据已被正确接收。这不是严格意义上的RabbitMQ问题。
**注意:**Team RabbitMQ监控
rabbitmq-users
邮件列表,仅在有时回答StackOverflow上的问题。