org.apache.kafka.connect.errors.dataexception:数组默认值的json无效:“null”

q3aa0525  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(444)

我正在尝试使用confluent-4.1.1使用confluent kafka s3连接器。
s3Flume

"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"

为s3接收器运行kafka连接器时,会收到以下错误消息:

ERROR WorkerSinkTask{id=singular-s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
        at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我的模式只包含1个数组类型字段,其模式如下

{"name":"item_id","type":{"type":"array","items":["null","string"]},"default":[]}

我可以使用kafka avro console consumer命令查看反序列化的消息。我见过类似的问题,但在他的情况下,他使用avro序列化器的关键也。

./confluent-4.1.1/bin/kafka-avro-console-consumer --topic singular_custom_postback --bootstrap-server localhost:9092  -max-messages 2

"item_id":[{"string":"15552"},{"string":"37810"},{"string":"38061"}]
"item_id":[]

我无法将从控制台使用者获得的全部输出放入,因为它包含敏感的用户信息,因此我在模式中添加了惟一的array type字段。
提前谢谢。

1tu0hz3e

1tu0hz3e1#

这个 io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649) 为将您读取的消息的avro模式转换为connect sink的内部模式而调用。我相信这与你的信息数据无关。这就是为什么 AbstractKafkaAvroDeserializer 可以成功反序列化您的消息(例如,通过 kafka-avro-console-consumer ),因为您的消息是有效的avro消息。如果默认值为 null ,而 null 不是字段的有效值。例如

{
   "name":"item_id",
   "type":{
      "type":"array",
      "items":[
         "string"
      ]
   },
   "default": null
}

我建议你远程调试connect,看看到底是什么失败了。

izkcnapc

izkcnapc2#

与您链接的问题相同的问题。
在源代码中,您可以看到这个条件。

case ARRAY: {
    if (!jsonValue.isArray()) {
      throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
    }

当您的案例中的模式类型定义为 type:"array" ,但负载本身有一个 null 值(或任何其他值类型),而不是实际的数组,不管您将什么定义为模式默认值。默认值仅在 items 元素根本不存在,不是什么时候 "items":null 除此之外,我建议使用这样的模式,即记录对象,而不仅仅是命名数组,默认值为空数组,而不是 null .

{
  "type" : "record",
  "name" : "Items",
  "namespace" : "com.example.avro",
  "fields" : [ {
    "name" : "item_id",
    "type" : {
      "type" : "array",
      "items" : [ "null", "string" ]
    },
    "default": []
  } ]
}

相关问题