kafka jdbc sink connector为schema中有可选字段“parentid”的消息提供空指针异常。我错过什么了吗?我使用现成的jsonconverter和jdbc sink连接器
关于Kafka主题的信息是
{
"schema":{
"type":"struct",
"fields":[
{
"field":"id",
"type":"string"
},
{
"field":"type",
"type":"string"
},
{
"field":"eventId",
"type":"string"
},
{
"field":"parentId",
"type":"string",
"optional":true
},
{
"field":"created",
"type":"int64",
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1
}
]
},
"payload":{
"id":"asset-1",
"type":"parcel",
"eventId":"evt-1",
"created":1501834166000
}
}
连接器具有这些特性
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id
但是jdbc接收器连接器对于可选字段parentid失败
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
1条答案
按热度按时间2nc8po8w1#
根据源代码
JsonConverter
字段的值,标记为optional
必须在消息负载中。你可以在
JsonConverter
在转换的方法中初始化jsonValue to
对象`摘要如果您的案例中存在架构,则为:
值必须在消息负载中。可以为空,但必须为空。
如果你看其他网站的代码。代码,负责序列化。它补充道
NullNode
用于空引用。