rabbitmq Rabbit消费者已连接,但过了一段时间后无法从队列接收消息

t8e9dugd  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(341)

我有一个代码,它成功地连接和消费消息从RabbitMQ。但过了一段时间,消费者无法接收消息,但它的连接,而这个问题发生。

package rabbitmq

import (
    "context"
    "fmt"
    "os"
    "runtime"
    "time"

    "github.com/getsentry/sentry-go"
    log "github.com/sirupsen/logrus"
    "github.com/streadway/amqp"
)

type RabbitMQ struct {
    conn             *amqp.Connection
    queues           map[string]amqp.Queue
    connString       string
    rabbitCloseError chan *amqp.Error
    recoveryConsumer []RecoveryConsumer
    // ch         *amqp.Channel
    // exchange_name string
}

type RecoveryConsumer struct {
    queueName   string
    routingKey  string
    handler     func(d amqp.Delivery)
    concurrency int8
}

type (
    Delivery = amqp.Delivery
)

func (r *RabbitMQ) IfExist(queueName string) bool {
    for _, item := range r.recoveryConsumer {
        if item.queueName == queueName {
            return false
        }
    }
    return true
}

func (r *RabbitMQ) RecoverConsumers() {
    for _, i := range r.recoveryConsumer {
        go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
        log.Infof("Consumer for %v successfully recovered", i.queueName)
    }
}

func (r *RabbitMQ) Reconnector() {
    for { //nolint
        select {
        case err := <-r.rabbitCloseError:
            log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
            log.Debug("Reconnecting after connection closed")
            sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
            r.connection()
            r.RecoverConsumers()
        }
    }
}

func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
    r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
    if virthost != "/" || len(virthost) > 0 {
        r.connString += virthost
    }
    r.connection()
    go r.Reconnector()
}

func (r *RabbitMQ) connection() {
    if r.conn != nil {
        if !r.conn.IsClosed() {
            return
        } else {
            log.Info("Reconnecting to RabbitMQ...")
        }
    }

    var err error
    r.conn, err = amqp.Dial(r.connString)
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
    }
    r.conn.Config.Heartbeat = 5 * time.Second
    r.queues = make(map[string]amqp.Queue)

    r.rabbitCloseError = make(chan *amqp.Error)
    r.conn.NotifyClose(r.rabbitCloseError)
    log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
    log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}

func (r *RabbitMQ) CreateChannel() *amqp.Channel {
    ch, err := r.conn.Channel()
    if err != nil {
        log.Error(err)
        return nil
    }
    return ch
}

func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
    q, err := ch.QueueDeclare(
        name,  // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        log.Fatalf("%s: %s", "Failed to declare a queue", err)
    }
    r.queues[name] = q
    // r.ch.ExchangeDeclare()
}

func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
    _, err := ch.QueueDeclare(
        name,  // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        ch.Close()
        log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
        sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
    }
}

func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
    span := sentry.StartSpan(context.TODO(), "publish message")
    defer span.Finish()
    err := ch.Publish(
        "",                   // exchange
        r.queues[queue].Name, // routing key
        false,                // mandatory
        false,                // immediate
        amqp.Publishing{
            Headers:         map[string]interface{}{},
            ContentType:     "application/json",
            ContentEncoding: "",
            DeliveryMode:    amqp.Persistent,
            Priority:        0,
            CorrelationId:   "",
            ReplyTo:         "",
            Expiration:      "",
            MessageId:       "",
            Timestamp:       time.Now().UTC(),
            Type:            "",
            UserId:          "",
            AppId:           "",
            Body:            body,
        })
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed to publish a message", err)
    }

    log.Debugf("Send message: %s", string(body))
}

func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
    // prefetch 4x as many messages as we can handle at once
    ok := r.IfExist(queueName)
    if ok {
        r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
            queueName:   queueName,
            routingKey:  routingKey,
            handler:     handler,
            concurrency: int8(concurrency),
        })
    }

    ch, err := r.conn.Channel()
    if err != nil {
        log.Error(err)
    }
    prefetchCount := concurrency * 1
    err = ch.Qos(prefetchCount, 0, false)
    if err != nil {
        sentry.CaptureException(err)
        log.Errorf("%s: %s", "Failed QOS", err)
    }
    r.QueueAttach(ch, queueName)

    msgs, err := ch.Consume(
        queueName, // queue
        "",        // consumer
        true,      // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed consume message", err)
        sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
        os.Exit(1)
    }

    go func() {
        for msg := range msgs {
            handler(msg)
        }
    }()
}

func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
    st := time.Now()
    for time.Since(st).Seconds() < 1 {
        msg, ok, err := ch.Get(queueName, true)
        if err != nil {
            log.Errorf("Can't consume queue. Error: %s", err.Error())
            sentry.CaptureException(err)
            return nil
        }
        if ok {
            return msg.Body
        }
        time.Sleep(50 * time.Millisecond)
    }
    return nil
}

这可能是什么原因?我知道它应该在兔子端,但客户端库不能显示任何错误。
因为在开始工作之后,消费者继续倾听并成功地工作。

798qvoo8

798qvoo81#

我唯一可以建议你尝试的是使用心跳。这将检测连接是否由于例如网络故障而死。
你可以在这里看看它:https://www.rabbitmq.com/heartbeats.html
我不确定这个,我很久没有使用它了,但是如果你在接收消息位周围放一个try catch,当连接断开时,它可能会出现在catch中。
我希望这对你解决问题有一点帮助。

相关问题