kafka connect jdbc source:字段的空值是必需的,没有默认值

pdsfdshx  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(319)

我正在尝试用postgresql设置kafka connect jdbc源连接器,但出现以下错误:

org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:556)
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:650)
    at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:537)
    at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:290)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

配置如下:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "incrementing.column.name": "id",
  "tasks.max": "1",
  "query": "SELECT * FROM comments",
  "table.whitelist": "comments",
  "mode": "incrementing",
  "key.converter.schemas.enable": "true",
  "topic.prefix": "comments_topic",
  "value.converter.schemas.enable": "true",
  "name": "JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://db:5432/my-db?user=postgres&password=password",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"
}

你知道问题是什么吗?
我有时也会犯这样的错误:

org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
    at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:359)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:532)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:76)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题