我在Windows中使用WSL执行此操作,我不知道是否与此相关
我使用Docker Compose创建了一个简单的代理
version: "3.9"
services:
# Create service with RabbitMQ.
broker:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "1883:1883" # For MQTT
- "8080:15672" # for serve RabbitMQ GUI
volumes:
- ./rabbitmq/:/etc/rabbitmq/:ro
在文件夹./rabbitmq/ a基本配置
rabbitmq.conf
log.file.level = debug
loopback_users.guest = false
management.load_definitions = /etc/rabbitmq/definitions.json
# MQTT config
mqtt.vhost = /
mqtt.allow_anonymous = false
definitions.json
{
"users": [
{
"name": "guest",
"password": "guest",
"tags": "administrator"
},
{
"name": "mqtt",
"password": "mqtt",
"tags": "management"
}
],
"vhosts": [
{
"name": "/"
}
],
"permissions": [
{
"user": "mqtt",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"queues": [
{
"name": "mqtt",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {}
}
]
}
启用插件
[rabbitmq_federation_management,rabbitmq_management,rabbitmq_mqtt].
在另一个容器中,我添加了库libmosquito,并编译了一个取自mosquitto repo的小示例
/*
* This example shows how to publish messages from outside of the Mosquitto network loop.
*/
#include <mosquitto.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/* Callback called when the client receives a CONNACK message from the broker. */
void on_connect(struct mosquitto *mosq, void *obj, int reason_code)
{
printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
if(reason_code != 0){
mosquitto_disconnect(mosq);
}
/* You may wish to set a flag here to indicate to your application that the
* client is now connected. */
}
void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
printf("Message with mid %d has been published.\n", mid);
}
int get_temperature(void)
{
sleep(1); /* Prevent a storm of messages - this pretend sensor works at 1Hz */
return random()%100;
}
/* This function pretends to read some data from a sensor and publish it.*/
void publish_sensor_data(struct mosquitto *mosq)
{
char payload[20];
int temp;
int rc;
/* Get our pretend data */
temp = get_temperature();
/* Print it to a string for easy human reading - payload format is highly
* application dependent. */
snprintf(payload, sizeof(payload), "%d", temp);
rc = mosquitto_publish(mosq, NULL, "example/temperature", strlen(payload), payload, 2, false);
if(rc != MOSQ_ERR_SUCCESS){
fprintf(stderr, "Error publishing: %s\n", mosquitto_strerror(rc));
}
}
int main(int argc, char *argv[])
{
struct mosquitto *mosq;
int rc;
/* Required before calling other mosquitto functions */
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
if(mosq == NULL){
fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
// Add user name defined in conf
mosquitto_username_pw_set(mosq, "mqtt", "mqtt");
/* Configure callbacks. This should be done before connecting ideally. */
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_publish_callback_set(mosq, on_publish);
rc = mosquitto_connect(mosq, "host.docker.internal", 1883, 60);
if(rc != MOSQ_ERR_SUCCESS){
mosquitto_destroy(mosq);
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
return 1;
}
/* Run the network loop in a background thread, this call returns quickly. */
rc = mosquitto_loop_start(mosq);
if(rc != MOSQ_ERR_SUCCESS){
mosquitto_destroy(mosq);
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
return 1;
}
while(1){
publish_sensor_data(mosq);
}
mosquitto_lib_cleanup();
return 0;
}
程序输出为:
root@a6c0bc2147da:/src# ./a.out
on_connect: Connection Accepted.
[... nothing more appear ...]
rabbitmq的对数是:
2023-03-14 20:14:39.346965+00:00 [debug] <0.1589.0> MQTT accepting TCP connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883)
2023-03-14 20:14:39.347458+00:00 [debug] <0.1589.0> Received a CONNECT, client ID: "mosq-hdlnBrkmJ6xRctY5t3" (expanded to "mosq-hdlnBrkmJ6xRctY5t3"), username: "mqtt", clean session: true, protocol version: 4, keepalive: 60
2023-03-14 20:14:39.347658+00:00 [debug] <0.1589.0> MQTT vhost picked using plugin configuration or default
2023-03-14 20:14:39.348422+00:00 [debug] <0.1593.0> User 'mqtt' authenticated successfully by backend rabbit_auth_backend_internal
2023-03-14 20:14:39.351230+00:00 [info] <0.1589.0> accepting MQTT connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883, client id: mosq-hdlnBrkmJ6xRctY5t3)
2023-03-14 20:14:40.351415+00:00 [info] <0.1589.0> MQTT connection "172.22.0.1:57814 -> 172.22.0.2:1883" will terminate because peer closed TCP connection
2023-03-14 20:14:40.351534+00:00 [debug] <0.1589.0> MQTT: about to send will message (if any) on connection "172.22.0.1:57814 -> 172.22.0.2:1883"
2023-03-14 20:14:40.352099+00:00 [debug] <0.1635.0> Closing all channels from connection '172.22.0.1:57814 -> 172.22.0.2:1883' because it has been closed
- 正在关闭连接'172.22.0.1:57814-〉172.22.0.2:1883'中的所有通道,因为它已关闭 *
如果我将IP地址从host.docker.internal更改为test.mosquitto.org,则可以正常工作,并且每秒都会调用发布回调函数
我已经尝试了mosquito_connect(也是异步)和mosquito_loop(也是启动)的多种配置,并设置了keepalive超时。
我不明白为什么会出现这个错误,有人能帮助我吗?
1条答案
按热度按时间pn9klfpd1#
我在rabbitmq mqtt的文档中找到了这个:
为了使它正常工作,你需要发布消息到rabbitmq,qos为1,我不确定这是一个bug还是一个特性