在设置了源连接器和接收器连接器之后,我发现 DATE
键入postgres列。
错误:列“foo”的类型为date,但表达式的类型为integer
我检查了avro模式,看到了那个列 foo
已序列化为 io.debezium.time.Date
```
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "io.debezium.time.Date",
"connect.version": 1,
"type": "int"
}
]
}
我应该怎么做才能让接收器连接器正确地插入这个值(例如 `DATE` ,不是 `INTEGER` )?
完整堆栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
... 12 more
源配置:
{
"name": "dbz-source-test-1",
"config": {
"name":"dbz-source-test-1",
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.hostname":"some.host",
"database.port":"5432",
"database.user":"test_debezium",
"database.password":"password",
"database.dbname":"dbname",
"plugin.name":"wal2json_rds",
"slot.name":"wal2json_rds",
"database.server.name":"server_test",
"table.whitelist":"public.test_table",
"transforms":"route",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement":"dbz_source_$3",
"topic.selection.strategy":"topic_per_table",
"include.unknown.datatypes":true,
"decimal.handling.mode":"double",
"snapshot.mode":"never"
}
}
接收器配置:
{
"name": "dbz-sink-test-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"config.providers" : "file",
"config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers.file.param.secrets" : "/opt/mysecrets",
"topics": "dbz_source_test_table",
"connection.url": "someurl",
"connection.user": "${file:/opt/mysecrets.properties:user}",
"connection.password" : "${file:/opt/mysecrets.properties:pass}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"table.name.format": "dbz_source_",
"insert.mode": "upsert",
"pk.field": "id",
"pk.mode": "record_value"
}
}
1条答案
按热度按时间new9mtju1#
我修复了切换电源连接器的问题
time.precision.mode
配置到connect
当time.precision.mode配置属性设置为connect时,连接器将使用预定义的kafka connect逻辑类型。当使用者只知道内置的kafka connect逻辑类型并且无法处理可变精度时间值时,这可能很有用。在序列化类型变得不同之后:
接收器连接器知道
org.apache.kafka.connect.data.Date
键入并正确插入。