我有一个代码,它成功地连接和消费消息从RabbitMQ。但过了一段时间,消费者无法接收消息,但它的连接,而这个问题发生。
package rabbitmq
import (
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
conn *amqp.Connection
queues map[string]amqp.Queue
connString string
rabbitCloseError chan *amqp.Error
recoveryConsumer []RecoveryConsumer
// ch *amqp.Channel
// exchange_name string
}
type RecoveryConsumer struct {
queueName string
routingKey string
handler func(d amqp.Delivery)
concurrency int8
}
type (
Delivery = amqp.Delivery
)
func (r *RabbitMQ) IfExist(queueName string) bool {
for _, item := range r.recoveryConsumer {
if item.queueName == queueName {
return false
}
}
return true
}
func (r *RabbitMQ) RecoverConsumers() {
for _, i := range r.recoveryConsumer {
go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
log.Infof("Consumer for %v successfully recovered", i.queueName)
}
}
func (r *RabbitMQ) Reconnector() {
for { //nolint
select {
case err := <-r.rabbitCloseError:
log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
log.Debug("Reconnecting after connection closed")
sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
r.connection()
r.RecoverConsumers()
}
}
}
func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
if virthost != "/" || len(virthost) > 0 {
r.connString += virthost
}
r.connection()
go r.Reconnector()
}
func (r *RabbitMQ) connection() {
if r.conn != nil {
if !r.conn.IsClosed() {
return
} else {
log.Info("Reconnecting to RabbitMQ...")
}
}
var err error
r.conn, err = amqp.Dial(r.connString)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
r.conn.Config.Heartbeat = 5 * time.Second
r.queues = make(map[string]amqp.Queue)
r.rabbitCloseError = make(chan *amqp.Error)
r.conn.NotifyClose(r.rabbitCloseError)
log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}
func (r *RabbitMQ) CreateChannel() *amqp.Channel {
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
return nil
}
return ch
}
func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
r.queues[name] = q
// r.ch.ExchangeDeclare()
}
func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
_, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
}
}
func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
span := sentry.StartSpan(context.TODO(), "publish message")
defer span.Finish()
err := ch.Publish(
"", // exchange
r.queues[queue].Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: map[string]interface{}{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent,
Priority: 0,
CorrelationId: "",
ReplyTo: "",
Expiration: "",
MessageId: "",
Timestamp: time.Now().UTC(),
Type: "",
UserId: "",
AppId: "",
Body: body,
})
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Debugf("Send message: %s", string(body))
}
func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
// prefetch 4x as many messages as we can handle at once
ok := r.IfExist(queueName)
if ok {
r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
queueName: queueName,
routingKey: routingKey,
handler: handler,
concurrency: int8(concurrency),
})
}
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
}
prefetchCount := concurrency * 1
err = ch.Qos(prefetchCount, 0, false)
if err != nil {
sentry.CaptureException(err)
log.Errorf("%s: %s", "Failed QOS", err)
}
r.QueueAttach(ch, queueName)
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed consume message", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
os.Exit(1)
}
go func() {
for msg := range msgs {
handler(msg)
}
}()
}
func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
st := time.Now()
for time.Since(st).Seconds() < 1 {
msg, ok, err := ch.Get(queueName, true)
if err != nil {
log.Errorf("Can't consume queue. Error: %s", err.Error())
sentry.CaptureException(err)
return nil
}
if ok {
return msg.Body
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
这可能是什么原因?我知道它应该在兔子端,但客户端库不能显示任何错误。
因为在开始工作之后,消费者继续倾听并成功地工作。
1条答案
按热度按时间798qvoo81#
我唯一可以建议你尝试的是使用心跳。这将检测连接是否由于例如网络故障而死。
你可以在这里看看它:https://www.rabbitmq.com/heartbeats.html。
我不确定这个,我很久没有使用它了,但是如果你在接收消息位周围放一个try catch,当连接断开时,它可能会出现在catch中。
我希望这对你解决问题有一点帮助。