我使用的是confluent platform 5.3.1,我在分布式模式下定义了两个不同的mqtt源连接器,使用:
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source1",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 1>",
"mqtt.topics" : "<TOPIC MQTT 1>",
"kafka.topics" : "mqtt1",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
和
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source2",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 2>",
"mqtt.topics" : "<TOPIC MQTT 2>",
"kafka.topics" : "mqtt2",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
我有一些问题:
1) 查看连接器的状态,我得到了两个连接器相同的结果(下面我给出了一个响应示例):
{
"name": "mqtt-source1",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
2) 当我创建第一个连接器时,会在kafka上自动创建主题“mqtt”。就我个人而言,我将在kafka上创建两个不同的主题(即“mqtt1”和“mqtt2”),设置在两个连接器中,但是我无法从我创建的主题中读取任何数据。为什么?ip和mqtt主题在这两个连接器中是不同的。
提前谢谢。
1条答案
按热度按时间hc8w905p1#
配置属性为
kafka.topic
不是kafka.topics
.既然你指定了
kafka.topics
连接器采用默认值,即mqtt
.参考:配置属性