我正在尝试将Mqtt中的数据摄取到Kafka中。不幸的是,其中一些Mqtt-Messages要么是空的,要么是无效的JSON。我假设这就是导致以下异常的原因:
{
"name": "source_mqtt_alarms",
"connector": {
"state": "RUNNING",
"worker_id": "-redacted-:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "-redacted-:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException:
Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:834)\n
Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value\n\t
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:611)\n\t
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592)\n\t
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346)\n\t
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t
... 11 more\n"
}
],
"type": "source"
}
从目前为止我所了解到的,看起来传入的(空/无效)消息不包含声明为非可选的值,这导致了上面的异常。
我的问题是,连接器从哪里获得这种期望?它说“null value for field that is required and has no default value”,但是如果模式是(我假设)为每条消息创建的,那么该字段是如何被要求的呢?
**其他信息:**我使用的是 www.example.com 。配置如下:
{
"name": "source_mqtt_alarms",
"config": {
"topics": "alarms",
"connect.mqtt.kcql": "INSERT INTO alarms SELECT * FROM `-redacted-/+/alarms` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
"connect.mqtt.client.id": "kafka_connect_alarms",
"tasks.max": 1,
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.service.quality": 2,
"connect.mqtt.hosts": "ssl://-redacted-:8883",
"connect.mqtt.ssl.ca.cert": "/usr/share/certs/cumu.crt",
"connect.mqtt.ssl.cert": "/usr/share/certs/mqtt.crt",
"connect.mqtt.ssl.key": "/usr/share/certs/mqtt.pem",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
}
}
**编辑:**我刚刚浏览了Kafka Connect worker的日志,它提供了更多的信息。在上面的异常之前,我失去了这些:
[2021-05-26 08:27:19,552] ERROR Error handling message with id:0 on topic:-redacted-/alarms (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager)
java.util.NoSuchElementException: head of empty list
at scala.collection.immutable.Nil$.head(List.scala:430)
at scala.collection.immutable.Nil$.head(List.scala:427)
at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:76)
at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:70)
at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:37)
at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:110)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:514)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
at java.base/java.lang.Thread.run(Thread.java:834)
1条答案
按热度按时间ljo96ir51#
结果表明,没有有效负载的消息可以通过Filter转换和RecordIsTombstone predicate 轻松删除,如下所示:(最后6行):