debezium postgres接收器连接器无法插入日期类型的值

fv2wmkja  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(450)

在设置了源连接器和接收器连接器之后,我发现 DATE 键入postgres列。
错误:列“foo”的类型为date,但表达式的类型为integer
我检查了avro模式,看到了那个列 foo 已序列化为 io.debezium.time.Date ```
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "io.debezium.time.Date",
"connect.version": 1,
"type": "int"
}
]
}

我应该怎么做才能让接收器连接器正确地插入这个值(例如 `DATE` ,不是 `INTEGER` )?
完整堆栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more

Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249

... 12 more
源配置:

{
"name": "dbz-source-test-1",
"config": {
"name":"dbz-source-test-1",
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.hostname":"some.host",
"database.port":"5432",
"database.user":"test_debezium",
"database.password":"password",
"database.dbname":"dbname",
"plugin.name":"wal2json_rds",
"slot.name":"wal2json_rds",
"database.server.name":"server_test",
"table.whitelist":"public.test_table",
"transforms":"route",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement":"dbz_source_$3",
"topic.selection.strategy":"topic_per_table",
"include.unknown.datatypes":true,
"decimal.handling.mode":"double",
"snapshot.mode":"never"
}
}

接收器配置:

{
"name": "dbz-sink-test-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"config.providers" : "file",
"config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers.file.param.secrets" : "/opt/mysecrets",
"topics": "dbz_source_test_table",
"connection.url": "someurl",
"connection.user": "${file:/opt/mysecrets.properties:user}",
"connection.password" : "${file:/opt/mysecrets.properties:pass}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"table.name.format": "dbz_source_",
"insert.mode": "upsert",
"pk.field": "id",
"pk.mode": "record_value"
}
}

new9mtju

new9mtju1#

我修复了切换电源连接器的问题 time.precision.mode 配置到 connect 当time.precision.mode配置属性设置为connect时,连接器将使用预定义的kafka connect逻辑类型。当使用者只知道内置的kafka connect逻辑类型并且无法处理可变精度时间值时,这可能很有用。
在序列化类型变得不同之后:

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "org.apache.kafka.connect.data.Date",
            "connect.version": 1,
            "logicalType": "date",
            "type": "int"
        }
    ]
}

接收器连接器知道 org.apache.kafka.connect.data.Date 键入并正确插入。

相关问题