我正在尝试用postgresql设置kafka connect jdbc源连接器,但出现以下错误:
org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:556)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:650)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:537)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
配置如下:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"incrementing.column.name": "id",
"tasks.max": "1",
"query": "SELECT * FROM comments",
"table.whitelist": "comments",
"mode": "incrementing",
"key.converter.schemas.enable": "true",
"topic.prefix": "comments_topic",
"value.converter.schemas.enable": "true",
"name": "JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/my-db?user=postgres&password=password",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
你知道问题是什么吗?
我有时也会犯这样的错误:
org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:981)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:359)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:532)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:76)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
暂无答案!
目前还没有任何答案,快来回答吧!