kafka mqtt源连接器未接收从mqtt发布的数据,并因“java.lang.nullpointerexception”而失败

cgfeq70w  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(319)

我是kafka的新手,正在研究kafka mqtt源连接器。下面是两个不同的示例来实现mqtt源连接器。
请检查以下详细信息。
参考文献1:https://johanvandevenne.github.io/kafka-connect-mqtt/
kafka mqtt源连接器

http://localhost:8083/connectors \
 -H 'Content-Type: application/json' \
 -d '{ "name": "mqtt-source-connector",
   "config":
   {
     "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
     "mqtt.topic":"temperature",
     "kafka.topic":"mqtt.",
     "mqtt.clientID":"my_client_id",
     "mqtt.broker":"tcp://127.0.0.1:1883",
     "key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schemas.enable":false,
     "value.converter":"org.apache.kafka.connect.storage.StringConverter",
     "value.converter.schemas.enable":false
   }
}'

参考文献2:https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.server.uri" : "tcp://127.0.0.1:1883",
    "mqtt.topics" : "temperature",
    "kafka.topic" : "mqtt.",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license":""
    }
}'

当我执行上述示例并检查连接器状态时,它显示我的运行方式如下:
curl http://localhost:8083/connectors/mqtt source/status | python-m json.tool
%总接收百分比%xferd平均速度时间当前数据加载上载总花费左速度100 185 100 185 0 10277 0--:--:--:--:--:--:--10277{“connector”:{“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“name”:“mqtt source”,“tasks”:[{“id”:0,“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“type”:“source}
但是当我从mqtt主题发布数据时,kafka消费者主题不会接收到它。
consumer:kafka-console-consumer.sh—引导服务器localhost:9092 --topic mqtt公司--从一开始
mqtt发布主题:mosquittoŠpub-h 127.0.0.1-p 1883-t temperature-q 2-m“99999,2.10Š”
当我从mqtt发布数据时,两个kafka mqtt连接器都给出以下错误:
{“connector”:{“state”:“running”,“worker\u id”:“127.0.1.1:8083”},“name”:“mqtt source”,“tasks”:[{“id”:0,“state”:“failed”,“trace”:“java.lang.nullpointerexception\n\tat org.apache.kafka.connect.runtime.workersourcetask.convertheaderfor(workersourcetask)。java:296)\org.apache.kafka.connect.runtime.workersourcetask.sendrecords(workersourcetask。java:226)\n\t org.apache.kafka.connect.runtime.workersourcetask.execute(workersourcetask。java:194)\否\torg.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:170)\n\t org.apache.kafka.connect.runtime.workertask.run(workertask。java:214)\n\t java.base/java.util.concurrent.executors$runnableadapter.call(executors。java:515)\n\tjava.base/java.util.concurrent.futuretask.run(futuretask。java:264)\否\tjava.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)\n\t java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)\n\tat java.base/java.lang.thread.run(线程。java:834)\n,“worker\u id”:“127.0.1.1:8083”}],“type”:“source”}
连接器属性有问题吗?或者别的什么。。?
提前谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题