我正在尝试使用kafka向不同的主题发送mqtt消息。我使用的是汇合mqttsinkconnector,连接器配置如下所示
{
"name" : "mqttSinkConnector",
"config" : {
"connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://mqttServer:1883",
"topics":"mqttSink",
"mqtt.qos": "1",
"mqtt.username": "user",
"mqtt.password": "pass",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"confluent.topic.bootstrap.servers": "kafkaServer:9092",
"confluent.topic.replication.factor": "1"
}}
我用不同的键向kafka的“mqttsink”主题发送测试消息,但它们最终都位于mqtt代理的“mqttsink”主题中。据我所知,它应该落在“/t1”主题中。有没有办法从Kafka内部发布到不同的mqtt主题。
我试过的测试制作人;
# python confluent-kafka producer
p.produce('mqttSink', value=testStr, key=b'/t1')
# python kafka producer
producer.send("mqttSink",key=b'/t1', value=testStr).get(timeout=30)
暂无答案!
目前还没有任何答案,快来回答吧!