为什么libmosquito在发布第一条消息给rabbitmq后关闭连接,并将其作为mqtt代理?

czq61nw1  于 2023-03-18  发布在  RabbitMQ
关注(0)|答案(1)|浏览(217)

我在Windows中使用WSL执行此操作,我不知道是否与此相关
我使用Docker Compose创建了一个简单的代理

version: "3.9"

services:
  # Create service with RabbitMQ.
  broker:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "1883:1883" # For MQTT
      - "8080:15672" # for serve RabbitMQ GUI
    volumes:
      - ./rabbitmq/:/etc/rabbitmq/:ro

在文件夹./rabbitmq/ a基本配置
rabbitmq.conf

log.file.level = debug
loopback_users.guest = false
management.load_definitions = /etc/rabbitmq/definitions.json

# MQTT config
mqtt.vhost    = /
mqtt.allow_anonymous = false

definitions.json

{
    "users": [
        {
        "name": "guest",
        "password": "guest",
        "tags": "administrator"
        },
        {
            "name": "mqtt",
            "password": "mqtt",
            "tags": "management"
        }
    ],
    "vhosts": [
        {
        "name": "/"
        }
    ],
    "permissions": [
        {
            "user": "mqtt",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "queues": [
        {
        "name": "mqtt",
        "vhost": "/",
        "durable": true,
        "auto_delete": false,
        "arguments": {}
        }
    ]
}

启用插件

[rabbitmq_federation_management,rabbitmq_management,rabbitmq_mqtt].

在另一个容器中,我添加了库libmosquito,并编译了一个取自mosquitto repo的小示例

/*
 * This example shows how to publish messages from outside of the Mosquitto network loop.
 */

#include <mosquitto.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

/* Callback called when the client receives a CONNACK message from the broker. */
void on_connect(struct mosquitto *mosq, void *obj, int reason_code)
{
    printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
    if(reason_code != 0){
        mosquitto_disconnect(mosq);
    }

    /* You may wish to set a flag here to indicate to your application that the
     * client is now connected. */
}

void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
    printf("Message with mid %d has been published.\n", mid);
}

int get_temperature(void)
{
    sleep(1); /* Prevent a storm of messages - this pretend sensor works at 1Hz */
    return random()%100;
}

/* This function pretends to read some data from a sensor and publish it.*/
void publish_sensor_data(struct mosquitto *mosq)
{
    char payload[20];
    int temp;
    int rc;

    /* Get our pretend data */
    temp = get_temperature();
    /* Print it to a string for easy human reading - payload format is highly
     * application dependent. */
    snprintf(payload, sizeof(payload), "%d", temp);

    rc = mosquitto_publish(mosq, NULL, "example/temperature", strlen(payload), payload, 2, false);
    if(rc != MOSQ_ERR_SUCCESS){
        fprintf(stderr, "Error publishing: %s\n", mosquitto_strerror(rc));
    }
}

int main(int argc, char *argv[])
{
    struct mosquitto *mosq;
    int rc;

    /* Required before calling other mosquitto functions */
    mosquitto_lib_init();

    mosq = mosquitto_new(NULL, true, NULL);
    if(mosq == NULL){
        fprintf(stderr, "Error: Out of memory.\n");
        return 1;
    }

        // Add user name defined in conf
        mosquitto_username_pw_set(mosq, "mqtt", "mqtt");
    /* Configure callbacks. This should be done before connecting ideally. */
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_publish_callback_set(mosq, on_publish);

    rc = mosquitto_connect(mosq, "host.docker.internal", 1883, 60);
    if(rc != MOSQ_ERR_SUCCESS){
        mosquitto_destroy(mosq);
        fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
        return 1;
    }

    /* Run the network loop in a background thread, this call returns quickly. */
    rc = mosquitto_loop_start(mosq);
    if(rc != MOSQ_ERR_SUCCESS){
        mosquitto_destroy(mosq);
        fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
        return 1;
    }

    while(1){
        publish_sensor_data(mosq);
    }

    mosquitto_lib_cleanup();
    return 0;
}

程序输出为:

root@a6c0bc2147da:/src# ./a.out
on_connect: Connection Accepted.
[... nothing more appear ...]

rabbitmq的对数是:

2023-03-14 20:14:39.346965+00:00 [debug] <0.1589.0> MQTT accepting TCP connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883)
2023-03-14 20:14:39.347458+00:00 [debug] <0.1589.0> Received a CONNECT, client ID: "mosq-hdlnBrkmJ6xRctY5t3" (expanded to "mosq-hdlnBrkmJ6xRctY5t3"), username: "mqtt", clean session: true, protocol version: 4, keepalive: 60
2023-03-14 20:14:39.347658+00:00 [debug] <0.1589.0> MQTT vhost picked using plugin configuration or default
2023-03-14 20:14:39.348422+00:00 [debug] <0.1593.0> User 'mqtt' authenticated successfully by backend rabbit_auth_backend_internal
2023-03-14 20:14:39.351230+00:00 [info] <0.1589.0> accepting MQTT connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883, client id: mosq-hdlnBrkmJ6xRctY5t3)
2023-03-14 20:14:40.351415+00:00 [info] <0.1589.0> MQTT connection "172.22.0.1:57814 -> 172.22.0.2:1883" will terminate because peer closed TCP connection
2023-03-14 20:14:40.351534+00:00 [debug] <0.1589.0> MQTT: about to send will message (if any) on connection "172.22.0.1:57814 -> 172.22.0.2:1883"
2023-03-14 20:14:40.352099+00:00 [debug] <0.1635.0> Closing all channels from connection '172.22.0.1:57814 -> 172.22.0.2:1883' because it has been closed
  • 正在关闭连接'172.22.0.1:57814-〉172.22.0.2:1883'中的所有通道,因为它已关闭 *

如果我将IP地址从host.docker.internal更改为test.mosquitto.org,则可以正常工作,并且每秒都会调用发布回调函数
我已经尝试了mosquito_connect(也是异步)和mosquito_loop(也是启动)的多种配置,并设置了keepalive超时。
我不明白为什么会出现这个错误,有人能帮助我吗?

pn9klfpd

pn9klfpd1#

我在rabbitmq mqtt的文档中找到了这个:

RabbitMQ does not support QoS2 subscriptions. RabbitMQ automatically downgrades QoS 2 publishes and subscribes to QoS 1. Messages published as QoS 2 will be sent to subscribers as QoS 1.
Subscriptions with QoS 2 will be downgraded to QoS1 during SUBSCRIBE request (SUBACK responses will contain the actually provided QoS level).

为了使它正常工作,你需要发布消息到rabbitmq,qos为1,我不确定这是一个bug还是一个特性

相关问题