使用Kafka Connect处理空/无效的Mqtt消息

0md85ypi  于 2023-04-29  发布在  Apache
关注(0)|答案(1)|浏览(198)

我正在尝试将Mqtt中的数据摄取到Kafka中。不幸的是,其中一些Mqtt-Messages要么是空的,要么是无效的JSON。我假设这就是导致以下异常的原因:

{
  "name": "source_mqtt_alarms",
  "connector": {
    "state": "RUNNING",
    "worker_id": "-redacted-:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "-redacted-:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: 
        Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\t
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\t
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314)\n\t
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)\n\t
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\t
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\t
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\t
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
        at java.base/java.lang.Thread.run(Thread.java:834)\n
        Caused by: org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value\n\t
        at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:611)\n\t
        at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592)\n\t
        at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346)\n\t
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)\n\t
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314)\n\t
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\t
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t
        ... 11 more\n"
    }
  ],
  "type": "source"
}

从目前为止我所了解到的,看起来传入的(空/无效)消息不包含声明为非可选的值,这导致了上面的异常。
我的问题是,连接器从哪里获得这种期望?它说“null value for field that is required and has no default value”,但是如果模式是(我假设)为每条消息创建的,那么该字段是如何被要求的呢?

**其他信息:**我使用的是 www.example.com 。配置如下:

{
    "name": "source_mqtt_alarms",
    "config": {        
        "topics": "alarms",
        "connect.mqtt.kcql": "INSERT INTO alarms SELECT * FROM `-redacted-/+/alarms` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
        "connect.mqtt.client.id": "kafka_connect_alarms",
        
        "tasks.max": 1,
        "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
        
        "connect.mqtt.service.quality": 2,
        "connect.mqtt.hosts": "ssl://-redacted-:8883",
        
        "connect.mqtt.ssl.ca.cert": "/usr/share/certs/cumu.crt",
        "connect.mqtt.ssl.cert": "/usr/share/certs/mqtt.crt",
        "connect.mqtt.ssl.key": "/usr/share/certs/mqtt.pem",

        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": true,

        "key.converter":"org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": true,
    }
}

**编辑:**我刚刚浏览了Kafka Connect worker的日志,它提供了更多的信息。在上面的异常之前,我失去了这些:

[2021-05-26 08:27:19,552] ERROR Error handling message with id:0 on topic:-redacted-/alarms (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager)
java.util.NoSuchElementException: head of empty list
    at scala.collection.immutable.Nil$.head(List.scala:430)
    at scala.collection.immutable.Nil$.head(List.scala:427)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:76)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:70)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:37)
    at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:110)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:514)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
    at java.base/java.lang.Thread.run(Thread.java:834)
ljo96ir5

ljo96ir51#

结果表明,没有有效负载的消息可以通过Filter转换和RecordIsTombstone predicate 轻松删除,如下所示:(最后6行):

{
    "name": "source_mqtt_alarms",
    "config": {        
        "topics": "alarms",
        "connect.mqtt.kcql": "INSERT INTO alarms SELECT * FROM `-redacted-/+/alarms` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
        "connect.mqtt.client.id": "kafka_connect_alarms",
    
       "tasks.max": 1,
        "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
    
        "connect.mqtt.service.quality": 2,
        "connect.mqtt.hosts": "ssl://-redacted-:8883",
    
        "connect.mqtt.ssl.ca.cert": "/usr/share/certs/cumu.crt",
        "connect.mqtt.ssl.cert": "/usr/share/certs/mqtt.crt",
        "connect.mqtt.ssl.key": "/usr/share/certs/mqtt.pem",

        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,

        "key.converter":"org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,

        "transforms": "Filter",
        "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
        "transforms.Filter.predicate": "IsTombstone",

        "predicates": "IsTombstone",
        "predicates.IsTombstone.type": 
        "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
    }
}

相关问题