在mqtt souce连接器上设置kafka主题

hgc7kmma  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(385)

我使用的是confluent platform 5.3.1,我在分布式模式下定义了两个不同的mqtt源连接器,使用:

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source1",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 1>",
"mqtt.topics" : "<TOPIC MQTT 1>",
"kafka.topics" : "mqtt1",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source2",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 2>",
"mqtt.topics" : "<TOPIC MQTT 2>",
"kafka.topics" : "mqtt2",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

我有一些问题:
1) 查看连接器的状态,我得到了两个连接器相同的结果(下面我给出了一个响应示例):

{
"name": "mqtt-source1",
"connector": {
  "state": "RUNNING",
  "worker_id": "127.0.0.1:8083"
},
"tasks": [
  {
   "id": 0,
   "state": "RUNNING",
   "worker_id": "127.0.0.1:8083"
 }
],
  "type": "source"
}

2) 当我创建第一个连接器时,会在kafka上自动创建主题“mqtt”。就我个人而言,我将在kafka上创建两个不同的主题(即“mqtt1”和“mqtt2”),设置在两个连接器中,但是我无法从我创建的主题中读取任何数据。为什么?ip和mqtt主题在这两个连接器中是不同的。
提前谢谢。

hc8w905p

hc8w905p1#

配置属性为 kafka.topic 不是 kafka.topics .
既然你指定了 kafka.topics 连接器采用默认值,即 mqtt .
参考:配置属性

相关问题