我正在尝试使用confluent-4.1.1使用confluent kafka s3连接器。
s3Flume
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
为s3接收器运行kafka连接器时,会收到以下错误消息:
ERROR WorkerSinkTask{id=singular-s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我的模式只包含1个数组类型字段,其模式如下
{"name":"item_id","type":{"type":"array","items":["null","string"]},"default":[]}
我可以使用kafka avro console consumer命令查看反序列化的消息。我见过类似的问题,但在他的情况下,他使用avro序列化器的关键也。
./confluent-4.1.1/bin/kafka-avro-console-consumer --topic singular_custom_postback --bootstrap-server localhost:9092 -max-messages 2
"item_id":[{"string":"15552"},{"string":"37810"},{"string":"38061"}]
"item_id":[]
我无法将从控制台使用者获得的全部输出放入,因为它包含敏感的用户信息,因此我在模式中添加了惟一的array type字段。
提前谢谢。
2条答案
按热度按时间1tu0hz3e1#
这个
io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
为将您读取的消息的avro模式转换为connect sink的内部模式而调用。我相信这与你的信息数据无关。这就是为什么AbstractKafkaAvroDeserializer
可以成功反序列化您的消息(例如,通过kafka-avro-console-consumer
),因为您的消息是有效的avro消息。如果默认值为null
,而null
不是字段的有效值。例如我建议你远程调试connect,看看到底是什么失败了。
izkcnapc2#
与您链接的问题相同的问题。
在源代码中,您可以看到这个条件。
当您的案例中的模式类型定义为
type:"array"
,但负载本身有一个null
值(或任何其他值类型),而不是实际的数组,不管您将什么定义为模式默认值。默认值仅在items
元素根本不存在,不是什么时候"items":null
除此之外,我建议使用这样的模式,即记录对象,而不仅仅是命名数组,默认值为空数组,而不是null
.