如何检测死RabbitMQ连接?

6psbrbz9  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(4)|浏览(240)

我在Go语言中有一个RabbitMQ消费者脚本。这是一个来自RabbitMQ tutorial的简单脚本,它使用了streadway/amqp库。
问题是,如果RabbitMQ服务器停止,消费者脚本就不会退出;并且当RabbitMQ服务器重新启动时,消费者不再接收消息。
是否有一种方法可以检测到消费者连接已断开并重新连接,或者至少终止消费者脚本?
我知道库为连接设置了默认的10秒心跳间隔;有没有可能利用一下?

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test_task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            d.Ack(false)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
iibxawm4

iibxawm41#

amqp.Connection具有NotifyClose()方法,该方法返回通道信号传输或协议错误。

for {  //reconnection loop
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
    notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
    ch, err := conn.Channel()
    msgs, err := ch.Consume(
...
    for{  //receive loop
        select {  //check connection
            case err = <-notify:
            //work with error
            break //reconnect
        case d = <- msgs:
            //work with message
        ...
        }
    }
}
l7wslrjt

l7wslrjt2#

有几种方法可以做到这一点:检查传送通道是否关闭或使用Channel.NotifyClose

检查传送通道

启动消费者后,您将从传递通道接收。如您所知,接收操作可能采用x, ok := <-ch的特殊形式,其中okx具有零值时为false,原因是通道已关闭(并且为空):

conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()

delivery, _ := ch.Consume(
        queueName,
        consumerName,
        true,  // auto ack
        false, // exclusive
        false, // no local
        true,  // no wait,
        nil,   // table
    )

for {
    payload, ok := <- delivery
    if !ok {
        // ... channel closed
        return
    }
}

这是因为,当AMQP通道关闭或发生错误时,Go通道<-chan amqp.Delivery将关闭:
[It]继续传送至传回的chan Delivery,直到发生Channel.Cancel、Connection.Close、Channel.Close或AMQP例外状况。
使用Channel.NotifyClose
这很简单,原理也一样:
NotifyClose注册一个侦听器,用于在服务器以Connection.Close或Channel.Close方法的形式发送通道或连接异常时使用。
NotifyClose返回的通道与您作为参数传递的通道相同;该方法只在内部注册它,因此您可以执行以下操作:

errC := ch.NotifyClose(make(chan *amqp.Error, n))

其中,n是非零缓冲区大小。确保将缓冲通道传递到NotifyClose,否则,根据代码的结构,库可能会在发送时阻塞。
然后,您可以在errC通道上进行接收,并根据错误类型采取措施。简而言之,错误可能是:

  • 连接错误,通常无法恢复
  • 通道错误,也称为软异常,通常可通过重置连接来恢复
  • nil如果程序故意调用conn.Close()

要了解错误是否可恢复,您可以检查amqp.ErrorCode字段和/或Recover字段,如果发生软异常,该字段将设置为true。
下面的函数显示了如何区分错误代码-这是作为附加信息提供的。对于一般情况,只需检查Error.Recover

const (
    ConnectionError = 1
    ChannelError    = 2
)

func isConnectionError(err *amqp.Error) bool {
    return errorType(err.Code) == ConnectionError
}

func isChannelError(err *amqp.Error) bool {
    return errorType(err.Code) == ChannelError
}

func errorType(code int) int {
    switch code {
    case
        amqp.ContentTooLarge,    // 311
        amqp.NoConsumers,        // 313
        amqp.AccessRefused,      // 403
        amqp.NotFound,           // 404
        amqp.ResourceLocked,     // 405
        amqp.PreconditionFailed: // 406
        return ChannelError

    case
        amqp.ConnectionForced, // 320
        amqp.InvalidPath,      // 402
        amqp.FrameError,       // 501
        amqp.SyntaxError,      // 502
        amqp.CommandInvalid,   // 503
        amqp.ChannelError,     // 504
        amqp.UnexpectedFrame,  // 505
        amqp.ResourceError,    // 506
        amqp.NotAllowed,       // 530
        amqp.NotImplemented,   // 540
        amqp.InternalError:    // 541
        fallthrough

    default:
        return ConnectionError
    }
}
7bsow1i6

7bsow1i63#

这可能会帮助一些人
第一个

7vux5j2d

7vux5j2d4#

没有发现go-amqp库实现了连接池的断开和重新连接功能。
github上有一个基于Amqp二次打包的开源代码,已经支持断线后重连和异常重连,代码使用起来也比较简单,每个服务都有一个连接和通道。
Source Code here
示例代码:

package main

import (
    "go-rabbit/rabbit"
)

/*
    support isconnection and reconnection function
    And Failure re-send function
    @author : Bill

* /

func main() {
    var(
        addr = "amqp://guest:guest@localhost:5672/"
        queue = "testQueue"
        exchange = "test_exchange"
        routerKey = "/test"
        msg = "test1!"

        //delay
        delayQueue = "delay_queue"
        delayExchange = "delay_exchange"
        delayRouterKey = "delay_exchange"
        prefix = "v1_prefix"
        sep = "_"
        eType = "F"
        _ttl = 60 * 1000
    )

    var rabbitProduct1 = rabbit.NewRabbitProduct(addr,_ttl,prefix,sep,delayExchange,delayQueue,delayRouterKey)
    // register recycle
    go rabbitProduct1.InitDefdelay(false)
    go rabbitProduct1.InitDefdelay(true)
    go rabbitProduct1.RegisterDelayWithPreFix("delay_queue","delay_exchange","delay_exchange")

    // ttl is dead recycle time if ttl > 0 then recycle
    rabbitProduct1.PubMessage(true,eType,queue,exchange,routerKey,msg,rabbitProduct1.GetBool(1),rabbitProduct1.GetBool(0),_ttl)

}

希望它能帮助你或给你一些想法

相关问题