kafka jdbc接收器连接器为具有可选字段的模式的消息提供空指针异常

mmvthczy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(306)

kafka jdbc sink connector为schema中有可选字段“parentid”的消息提供空指针异常。我错过什么了吗?我使用现成的jsonconverter和jdbc sink连接器
关于Kafka主题的信息是

{
"schema":{
  "type":"struct",
  "fields":[
     {
        "field":"id",
        "type":"string"
     },
     {
        "field":"type",
        "type":"string"
     },
     {
        "field":"eventId",
        "type":"string"
     },
     {
        "field":"parentId",
        "type":"string",
        "optional":true
     },
     {
        "field":"created",
        "type":"int64",
        "name":"org.apache.kafka.connect.data.Timestamp",
        "version":1
     }
  ]
 },
 "payload":{
   "id":"asset-1",
   "type":"parcel",
   "eventId":"evt-1",
   "created":1501834166000
 }
}

连接器具有这些特性

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id

但是jdbc接收器连接器对于可选字段parentid失败

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
        at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
2nc8po8w

2nc8po8w1#

根据源代码 JsonConverter 字段的值,标记为 optional 必须在消息负载中。
你可以在 JsonConverter 在转换的方法中初始化 jsonValue to 对象`

private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
    final Schema.Type schemaType;
    if (schema != null) {
        schemaType = schema.type();
        if (jsonValue.isNull()) {
            if (schema.defaultValue() != null)
                return schema.defaultValue(); // any logical type conversions should already have been applied
            if (schema.isOptional())
                return null;
            throw new DataException("Invalid null value for required " + schemaType +  " field");
        }
    }

摘要如果您的案例中存在架构,则为:

{
    "field":"parentId",
    "type":"string",
    "optional":true
 }

值必须在消息负载中。可以为空,但必须为空。
如果你看其他网站的代码。代码,负责序列化。它补充道 NullNode 用于空引用。

private static JsonNode convertToJson(Schema schema, Object logicalValue) {
    if (logicalValue == null) {
        if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }

相关问题