我是kafka的新手,正在研究kafka mqtt源连接器。下面是两个不同的示例来实现mqtt源连接器。
请检查以下详细信息。
参考文献1:https://johanvandevenne.github.io/kafka-connect-mqtt/
kafka mqtt源连接器
http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "mqtt-source-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
"mqtt.topic":"temperature",
"kafka.topic":"mqtt.",
"mqtt.clientID":"my_client_id",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":false
}
}'
参考文献2:https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "tcp://127.0.0.1:1883",
"mqtt.topics" : "temperature",
"kafka.topic" : "mqtt.",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
当我执行上述示例并检查连接器状态时,它显示我的运行方式如下:
curl http://localhost:8083/connectors/mqtt source/status | python-m json.tool
%总接收百分比%xferd平均速度时间当前数据加载上载总花费左速度100 185 100 185 0 10277 0--:--:--:--:--:--:--10277{“connector”:{“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“name”:“mqtt source”,“tasks”:[{“id”:0,“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“type”:“source}
但是当我从mqtt主题发布数据时,kafka消费者主题不会接收到它。
consumer:kafka-console-consumer.sh—引导服务器localhost:9092 --topic mqtt公司--从一开始
mqtt发布主题:mosquittoŠpub-h 127.0.0.1-p 1883-t temperature-q 2-m“99999,2.10Š”
当我从mqtt发布数据时,两个kafka mqtt连接器都给出以下错误:
{“connector”:{“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“name”:“mqtt source”,“tasks”:[{“id”:0,“state”:“failed”,“trace”:“java.lang.nullpointerexception\n\tat org.apache.kafka.connect.runtime.workersourcetask.convertheaderfor(workersourcetask)。java:296)\org.apache.kafka.connect.runtime.workersourcetask.sendrecords(workersourcetask。java:226)\n\t org.apache.kafka.connect.runtime.workersourcetask.execute(workersourcetask。java:194)\否\torg.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:170)\n\t org.apache.kafka.connect.runtime.workertask.run(workertask。java:214)\n\t java.base/java.util.concurrent.executors$runnableadapter.call(executors。java:515)\n\tjava.base/java.util.concurrent.futuretask.run(futuretask。java:264)\否\tjava.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)\n\t java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)\n\tat java.base/java.lang.thread.run(线程。java:834)\n,“worker\u id”:“127.0.1.1:8083”}],“type”:“source”}
连接器属性有问题吗?或者别的什么。。?
提前谢谢
暂无答案!
目前还没有任何答案,快来回答吧!