kafka connect jdbc sink quote.sql.identifiers不工作

pnwntuvh  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(508)

我正在尝试使用kafkaconnect,使用jdbc源和汇连接器将数据从旧db2数据库同步到postgres数据库。它工作得很好,但前提是我对表名使用的大小写非常严格。
例如,我在db2中有一个名为action的表,它也存在于具有相同列的postgres中,等等。唯一的区别是在db2中它是大写的 ACTION 在postgres中是小写的 action .
以下是一个有效的接收器文件:

{
    "name": "jdbc_sink_pg_action",
    "config": {
        "_comment": "The JDBC connector class",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        "_comment": "How to serialise the value of keys ",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",

        "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "_comment": " --- JDBC-specific configuration below here  --- ",

        "_comment": "JDBC connection URL.",
        "connection.url": "jdbc:postgresql://localhost:5435/postgres",
        "connection.user": "postgres",
        "connection.password": "*****",

        "topics": "ACTION",
        "table.name.format": "action",

        "_comment": "The insertion mode to use",
        "insert.mode": "upsert",

        "_comment": "The primary key mode",
        "pk.mode": "record_value",

        "_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
        "pk.fields": "ACTION_ID",

        "quote.sql.identifiers": "never"
    }
}

这个可以,但不是很灵活。例如,我有许多其他表,我也想同步它们,但我不想为每个表创建一个连接器文件。所以我试着用: "table.name.format": "${topic}", 执行此操作时,在尝试加载接收器连接器时,日志中出现以下错误:
原因:org.apache.kafka.connect.errors.connectexception:缺少表“action”,并且禁用了自动创建
所以在我看来 "quote.sql.identifiers": "never" 实际上不起作用,否则接收器连接器正在执行的查询将不带引号,并且它将允许任何大小写(它将转换为lower)。
为什么这样不行?如果我只使用 ACTION 作为table.name.format。

osh3o9ms

osh3o9ms1#

您的postgresql表名( action )不等于主题名称( ACTION ). kafka connect jdbc连接器使用 getTables() 方法检查表是否存在,其中 tableNamePattern param区分大小写(根据文档: must match the table name as it is stored in the database ).
你可以用 ChangeTopicCase Kafka的转变将普通的转变联系起来。

相关问题