Go语言 MQTT客户机未接收到其他客户机发送的消息

u5rb5r59  于 2023-03-21  发布在  Go
关注(0)|答案(1)|浏览(343)

我遇到了一个问题,在几次断开连接和重新连接之后,我的mqtt客户机(比如A)停止接收来自另一个发布客户机(B)的消息。
如果我使用mosquitto_sub手动订阅B正在发布的主题,我可以看到所有消息都按预期发布。(mosquitto_pub)到订阅了A的主题,A也接收这些消息,因此订阅看起来有效。只有当B发布到该主题时,A未接收到消息。
这两个客户端都连接到一个mosquito代理(版本1.6.12)。所有的东西都运行在RaspberryPi CM 3上。客户端是用Go语言编写的,带有paho mqtt库(v1.4.2),并作为systemd服务启动。客户端使用以下选项初始化:

opts.SetCleanSession(false)
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)

另外,两个客户端都有一个唯一的ID。我查看了mosquito日志,注意到一些消息在客户端订阅之前就被发送到了重新连接的客户端。

1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m722, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m723, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m904, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1085, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1086, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1267, 'example-topic-1/a', ... (109 bytes))
...
...
1678786214: Sending PUBLISH to telemetry (d0, q1, r0, m2293, 'example-topic-1/c'1, ... (119 bytes))
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m1, 'example-topic-2', ... (7 bytes))
1678786214: Sending PUBREC to telemetry (m1, rc0)
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m2, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBREC to telemetry (m2, rc0)
1678786214: Received PUBREL from telemetry (Mid: 1)
1678786214: Sending PUBCOMP to telemetry (m1)
1678786214: Received PUBREL from telemetry (Mid: 2)
1678786214: Sending PUBCOMP to telemetry (m2)
1678786214: Received SUBSCRIBE from telemetry
1678786214:         shutdown (QoS 2)
1678786214: telemetry 2 shutdown
1678786214: Sending SUBACK to telemetry
1678786214: Received SUBSCRIBE from telemetry
1678786214:         example-topic-1/# (QoS 1)
1678786214: telemetry 1 example-topic-1/#
1678786214: Sending SUBACK to telemetry

在mqtt库的文档中指出
如果以前已创建QOS 1+订阅,并且在CleanSession设置为false的情况下进行连接,则代理可能会在调用Subscribe之前传递保留的消息。若要处理这些消息,请使用AddRoute配置处理程序或设置DefaultPublishHandler。
这是否意味着消息将丢失,或者它实际上以某种方式阻止了主题?
到目前为止,我尝试将mosquito从1.6.10版本升级到1.6.12版本,但没有解决这个问题。重启mosquito.service似乎可以解决这个问题,但并不是真正的解决方案。我的下一步是设置一个DefaultPublishHandler来处理过早发送的消息。
提前感谢您所提供的任何帮助。如果需要更多的信息,请告诉我!

编辑:

我又试了一下,查看了mosquito日志,可以缩小问题的范围,因为它似乎与消息的QoS级别有关。
当客户端A没有接收到客户端B发送的任何消息时,我尝试注册一个新客户端(使用mosquitto_sub),我们称之为C,它接收来自客户端B的消息,而不管订阅的QoS级别。客户端A仅接收QoS为0的消息。如果指定QoS为1或2,则客户端A不会接收任何消息。由于客户端B发送QoS为1的所有消息,代理似乎没有向客户端A发送任何QoS〉0的消息,而是向其他客户端发送。
所有客户端的OrderMatters都设置为false,代理的唯一配置是max_queued_messages 0

使用QoS 1发布的客户端B

1678874048: Sending PUBACK to clientB (m732, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m733, 'topic/b', ... (128 bytes))
1678874048: Sending PUBACK to clientB (m733, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m734, 'topic/c', ... (135 bytes))
1678874048: Sending PUBACK to clientB (m734, rc0)

注意没有消息发布到clientA。

使用QoS 1发布的ClientC

1678874088: No will message specified.
1678874088: Sending CONNACK to mosq-glZkfjX79CriC0qOK5 (0, 0)
1678874088: Received PUBLISH from mosq-glZkfjX79CriC0qOK5 (d0, q1, r0, m1, 'topic/a', ... (130 bytes))
1678874088: Sending PUBACK to mosq-glZkfjX79CriC0qOK5 (m1, rc0)
1678874088: Received DISCONNECT from mosq-glZkfjX79CriC0qOK5
1678874088: Client mosq-glZkfjX79CriC0qOK5 disconnected.

与使用新客户端发布时相同。

使用QoS 0发布的ClientC

1678874043: No will message specified.
1678874043: Sending CONNACK to mosq-pba5gDo7lNe7Sqz5jd (0, 0)
1678874043: Received PUBLISH from mosq-pba5gDo7lNe7Sqz5jd (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Sending PUBLISH to clientA (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Received DISCONNECT from mosq-pba5gDo7lNe7Sqz5jd
1678874043: Client mosq-pba5gDo7lNe7Sqz5jd disconnected.

使用新客户端和QoS为0的发布将发布到clientA。

使用QoS 1订阅的客户端C

1678874687: No will message specified.
1678874687: Sending CONNACK to mosq-vkwirAtvYxHwoLnCQH (0, 0)
1678874687: Received SUBSCRIBE from mosq-vkwirAtvYxHwoLnCQH
1678874687:         topic/+ (QoS 1)
1678874687: mosq-vkwirAtvYxHwoLnCQH 1 topic/+
1678874687: Sending SUBACK to mosq-vkwirAtvYxHwoLnCQH
...
1678874695: Received PUBLISH from clientB (d0, q1, r0, m673, 'topic/a', ... (137 bytes))
1678874695: Sending PUBACK to clientB (m673, rc0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m3, 'topic/a', ... (137 bytes))
1678874695: Received PUBLISH from clientB (d0, q1, r0, m674, 'topic/b', ... (130 bytes))
1678874695: Sending PUBACK to clientB (m674, rc0)
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 3, RC:0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m4, 'topic/b', ... (130 bytes))
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 4, RC:0)

订阅QoS为1的新客户端也可以...
老实说,我现在有点不知道,因为行为是相当奇怪的imo。callbackHandlers似乎没有阻止在clientA的一边,因为消息与QoS 0仍然得到处理。有没有任何设置或配置,可以搞乱一个QoS 1订阅?

编辑2:

下面的日志基本上是重新启动服务之前和之后的日志,这导致了所描述的行为。由于不止这两个客户端不断发送东西,我再次删除了一些部分,但关于这两个客户端的所有内容都应该在这里。从第一次连接和一些工作发布到重新启动和消息不再发送。
我注意到的另一件事是,当关闭服务时,客户端从不调用Disconnect。它试图取消订阅,有时甚至没有完成。这会导致这样的问题吗?

1678880076: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880076: No will message specified.
1678880076: Sending CONNACK to clientA (1, 0)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19039, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19041, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19331, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19333, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19453, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19454, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19575, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19576, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20053, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20054, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20235, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20236, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20349, 'topic1/ostree_sha_rollback', ... (172 bytes))
... (lots of PUBLISH withouth PUBACK from clientA)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20540, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880076: Sending PUBREC to clientA (m1, rc0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBREC to clientA (m2, rc0)
1678880076: Received PUBREL from clientA (Mid: 1)
1678880076: Sending PUBCOMP to clientA (m1)
1678880076: Received PUBREL from clientA (Mid: 2)
1678880076: Sending PUBCOMP to clientA (m2)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20720, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m3, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBREC to clientA (m3, rc0)
1678880076: Received SUBSCRIBE from clientA
1678880076:         shutdown (QoS 2)
1678880076: clientA 2 shutdown
1678880076: Sending SUBACK to clientA
1678880076: Received PUBREL from clientA (Mid: 3)
1678880076: Sending PUBCOMP to clientA (m3)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20721, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received SUBSCRIBE from clientA
1678880076:         telemetry/+ (QoS 1)
1678880076: clientA 1 telemetry/+
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20722, 'telemetry/geolocation', ... (51 bytes))
1678880076: Received PUBACK from clientA (Mid: 20722, RC:0)
1678880076: Received SUBSCRIBE from clientA
1678880076:         telemetry-batch (QoS 1)
1678880076: clientA 1 telemetry-batch
1678880076: Sending SUBACK to clientA
1678880076: Received SUBSCRIBE from clientA
1678880076:         topic1/# (QoS 1)
1678880076: clientA 1 topic1/#
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20723, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20724, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBACK from clientA (Mid: 20723, RC:0)
1678880076: Received PUBACK from clientA (Mid: 20724, RC:0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880076: Sending PUBREC to clientA (m8, rc0)
1678880076: Received PUBREL from clientA (Mid: 8)
1678880076: Sending PUBCOMP to clientA (m8)
...
1678880083: New connection from 127.0.0.1 on port 1883.
1678880083: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880083: No will message specified.
1678880083: Sending CONNACK to clientB (1, 0)
1678880085: Received PUBLISH from clientA (d0, q2, r1, m9, 'session', ... (36 bytes))
1678880085: Sending PUBREC to clientA (m9, rc0)
1678880085: Received PUBREL from clientA (Mid: 9)
1678880085: Sending PUBCOMP to clientA (m9)
...
1678880086: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/net_response', ... (118 bytes))
1678880086: Sending PUBACK to clientB (m3, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20725, 'topic1/net_response', ... (118 bytes))
1678880086: Received PUBACK from clientA (Mid: 20725, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Sending PUBACK to clientB (m4, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20726, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Received PUBACK from clientA (Mid: 20726, RC:0)
... (lots of PUBLISH with PUBACK from telemetry)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20742, 'topic1/swap', ... (169 bytes))
1678880086: Received PUBACK from clientA (Mid: 20742, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/syslog', ... (508219 bytes))
1678880086: Sending PUBACK to clientB (m21, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20743, 'topic1/syslog', ... (508219 bytes))
1678880086: Received PUBACK from clientA (Mid: 20743, RC:0)
...
1678880088: Received PUBLISH from clientB (d0, q1, r0, m22, 'topic1/version_ca_lockdown', ... (133 bytes))
1678880088: Sending PUBACK to clientB (m22, rc0)
... (here the services get restarted and afterwards the subscription seems broken for QoS >= 1)
1678880118: Received UNSUBSCRIBE from clientA
1678880118:         telemetry/+
1678880118: clientA telemetry/+
1678880118: Sending UNSUBACK to clientA
1678880118: Socket error on client clientA, disconnecting.
1678880119: Received PUBLISH from clientB (d0, q1, r0, m136, 'topic1/release_id', ... (126 bytes))
1678880119: Sending PUBACK to clientB (m136, rc0)
1678880119: Received PUBLISH from clientB (d0, q1, r0, m137, 'topic1/version_ca_configurator', ... (137 bytes))
1678880119: Sending PUBACK to clientB (m137, rc0)
...
1678880125: New connection from 127.0.0.1 on port 1883.
1678880125: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880125: No will message specified.
1678880125: Sending CONNACK to clientA (1, 0)
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
... (again, lots of PUBLISH without PUBACK)
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20873, 'topic1/proc_led', ... (151 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20874, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20875, 'topic1/ssid_wlan0', ... (99 bytes))
1678880125: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880125: Sending PUBREC to clientA (m1, rc0)
1678880125: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880125: Sending PUBREC to clientA (m2, rc0)
1678880125: Received PUBREL from clientA (Mid: 1)
1678880125: Sending PUBCOMP to clientA (m1)
1678880125: Received PUBREL from clientA (Mid: 2)
1678880125: Sending PUBCOMP to clientA (m2)
1678880125: Received SUBSCRIBE from clientA
1678880125:         shutdown (QoS 2)
1678880125: clientA 2 shutdown
1678880125: Sending SUBACK to clientA
1678880125: Received PUBLISH from clientA (d0, q2, r1, m4, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880125: Sending PUBREC to clientA (m4, rc0)
1678880125: Received SUBSCRIBE from clientA
1678880125:         telemetry/+ (QoS 1)
1678880125: clientA 1 telemetry/+
1678880125: Sending SUBACK to clientA
1678880125: Received PUBREL from clientA (Mid: 4)
1678880125: Sending PUBCOMP to clientA (m4)
1678880125: Received SUBSCRIBE from clientA
1678880125:         telemetry-batch (QoS 1)
1678880125: clientA 1 telemetry-batch
1678880125: Sending SUBACK to clientA
1678880125: Received SUBSCRIBE from clientA
1678880125:         topic1/# (QoS 1)
1678880125: clientA 1 topic1/#
1678880125: Sending SUBACK to clientA
1678880126: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880126: Sending PUBREC to clientA (m8, rc0)
1678880126: Received PUBREL from clientA (Mid: 8)
1678880126: Sending PUBCOMP to clientA (m8)
...
1678880133: New connection from 127.0.0.1 on port 1883.
1678880133: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880133: No will message specified.
1678880133: Sending CONNACK to clientB (1, 0)
...
1678880135: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/provision_date', ... (131 bytes))
1678880135: Sending PUBACK to clientB (m3, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/processes', ... (99 bytes))
... (There are no more PUBLISH message from broker to clientA)
1678880135: Sending PUBACK to clientB (m20, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/net_response', ... (118 bytes))
1678880135: Sending PUBACK to clientB (m21, rc0)

我仍在开发一个最小的、可重复的示例,但不幸的是,到目前为止还没有在嵌入式环境之外重新创建错误。

编辑3:

为了完整起见,我设法构建了一个最小的、可重复的示例,并认为我也会将其发布。

package main

import (
    "os"
    "os/signal"
    "syscall"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/sirupsen/logrus"
)

func main() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    stop := make(chan bool, 1)
    optsA := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
    logrus.Info("setup and connect clientA")
    clientA := setup(optsA, "clientA")
    if err := connect(clientA); err != nil {
        panic(err)
    }

    if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
        logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
    }); tok.Wait() && tok.Error() != nil {
        panic(tok.Error())
    }

    logrus.Infof("setup and connect clientB")
    optsB := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
    clientB := setup(optsB, "clientB")
    if err := connect(clientB); err != nil {
        panic(err)
    }

    logrus.Info("start publishing from clientB")
    go publish(clientB, stop)

    logrus.Info("let clientA receive some messages from clientB")
    time.Sleep(500 * time.Millisecond)

    clientA.Disconnect(10)

    logrus.Info("wait until ClientB published more than max_inflight_messages")
    time.Sleep(1100 * time.Millisecond)

    logrus.Infof("connecting clientA again")
    clientA = setup(optsA, "clientA")
    if err := connect(clientA); err != nil {
        panic(err)
    }

    logrus.Info("wait shortly before subscribing")
    time.Sleep(1 * time.Second)

    logrus.Info("subscribe with clientA")
    if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
        logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
    }); tok.Wait() && tok.Error() != nil {
        panic(tok.Error())
    }

    <-c
    stop <- true
}

func publish(client mqtt.Client, stop chan bool) {
    for {
        select {
        case <-stop:
            return
        default:
            if tok := client.Publish("topic/exmaple", 1, false, "message"); tok.Wait() && tok.Error() != nil {
                logrus.WithError(tok.Error()).Warnf("failed to publish, continuing")
                continue
            }
            time.Sleep(50 * time.Millisecond)
        }
    }
}

func connect(client mqtt.Client) error {
    if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
        logrus.WithError(tok.Error()).Error("failed to connect")
        return tok.Error()
    }
    return nil
}

func setup(opts *mqtt.ClientOptions, id string) mqtt.Client {
    opts.SetClientID(id)
    opts.SetOrderMatters(false)
    opts.SetCleanSession(false)
    // opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
    //  logrus.Infof("received message on topic %s which does not match any subscriptions (yet)", msg.Topic())
    // })
    opts.SetKeepAlive(10 * time.Second)
    opts.SetPingTimeout(1 * time.Second)
    opts.SetAutoReconnect(true)
    opts.SetConnectTimeout(15 * time.Second)

    cl := mqtt.NewClient(opts)
    return cl
}

正如@布里茨在回答中解释的那样,如果我取消对DefaultPublishHandler的注解,消息就会得到确认,订阅也会对更多消息起作用。

huus2vyu

huus2vyu1#

谢谢你的日志Mosquitto没有收到连接建立后立即收到的消息的PUBACK这一事实使我找到了可能的原因。
使用Mosquitto v1.6.x时,max_inflight_messages默认为10;所以在10条未确认的消息之后,Mosquitto将不会再发送任何消息。这就是它停止发送到clientA的原因。
如果没有处理程序,paho.mqtt.golang将不确认消息(如果您启用日志记录,它将在发生这种情况时输出警告)。这样做的理由在时间的迷雾中丢失了(我加上了警告)但我怀疑这是因为,没有一个处理程序,不能说消息已经被处理(因此不应被确认)。早期版本的Mosquitto用于重新发送未被确认的消息,但现在不再是这种情况(并且在v5规范中被禁止),这意味着它实际上是一个永久块。
就你的情况而言,这两个因素是结合在一起的;您连接,接收10个PUBLISH包,然后订阅(设置处理程序),但此时Mosquitto有10条消息正在传输,不会再发送了。
修复方法是添加:

opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})

这将添加默认的发布处理程序(忽略消息);处理程序存在的事实意味着PUBLISH分组将被确认。
ClientA是否需要在离线时接收消息(它取消订阅了telemetry/+,所以无论如何都不会收到这些消息)。如果不需要,那么使用opts.SetCleanSession(true)是避免这个问题的另一种方法。
如果您确实需要处理消息,那么在连接之前使用AddRoute来配置消息处理程序(我通常有一个捕获所有消息的DefaultPublishHandler,它只记录消息,以便我可以看到遗漏了什么)。

相关问题