postgresql kafka连接器jdbc-sink末尾有语法错误

oo7oh9g9  于 2022-11-04  发布在  PostgreSQL
关注(0)|答案(2)|浏览(127)

我有一个关于jdbc-sink的问题。
postgres 1---〉Kafka---〉postgres 2
生成器工作正常,但使用者出错:
连接_1| org.apache.kafka.connect.errors.RetriableException:java.sql.SQLException:java.sql.BatchUpdateException:批分录0 INSERT INTO“客户”(“id”)VALUES(1)ON CONFLICT(“id”)DO UPDATE SET被中止:错误:输入connect_1结尾处有语法错误|
Position:77调用getNextException来查看批处理中的其他错误。
这是我的source.json

{
"name": "src-table",
"config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres1_container",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "database.whitelist": "postgres",
    "database.server.name": "postgres1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}

这是我的jdbc-sink.json

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres2_container:5432/postgres?user=postgres&password=postgres",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

代苯/Zookeeper:0.9
德贝西翁/Kafka:0.9
德贝西铵/波斯特格雷:9.6
debezium/连接:0.9
PostgreSQL JDBC驱动程序42.2.5
Kafka连接JDBC 5.2.1
我试图降级jdbc驱动程序和confluentKafka连接,但仍然有相同的错误

fwzugrvs

fwzugrvs1#

解决问题,因为我在postgres1中创建表时,没有将id设置为PK值

ggazkfy8

ggazkfy82#

同样的问题,
我认为这是JDBC连接器上的一个问题,当表只有主键列而没有其他列时,没有什么要更新的,因此语句语法是错误的,因为它总是在发生冲突后排除要更新的列。
一种解决方案是向该表中添加额外的列,当然这不是解决方案,而是一种快速而肮脏的解决方法。
另一个解决方案是升级JDBC,我用kafka-connect-jdbc-10.4.0进行了同样的测试,似乎这个问题不再存在。

相关问题