与Postgres的Kafka jdbc接收器连接器失败,原因是“未实现跨数据库引用”

zpf6vheq  于 2022-11-21  发布在  Apache
关注(0)|答案(3)|浏览(160)

My setup is based on docker containers - 1 oracle db, kafka, kafka connect and postgres. I first use oracle CDC connector to feed kafka which works fine. Then I am trying to read that topic and feed it into Postgres. When I start the connector I am getting:
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\norg.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "ORCLCDB.C__MYUSER.EMP"\n Position: 14\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\nCaused by: java.sql.SQLException: Exception chain:\norg.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "ORCLCDB.C__MYUSER.EMP"\n Position: 14\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)\n\t... 11 more\n"
My config json looks like :

{
"name": "SimplePostgresSink",
"config":{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "name": "SimplePostgresSink",
  "tasks.max":1,
  "topics": "ORCLCDB.C__MYUSER.EMP",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "confluent.topic.bootstrap.servers":"kafka:29092",
  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "postgres",
  "insert.mode": "upsert",
  "pk.mode": "record_value",
  "pk.fields": "I",
  "auto.create": "true",
  "auto.evolve": "true"
}

}
And the topic schema is:

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "fields": [
    {
      "name": "I",
      "type": {
        "type": "bytes",
        "scale": 0,
        "precision": 64,
        "connect.version": 1,
        "connect.parameters": {
          "scale": "0"
        },
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "logicalType": "decimal"
      }
    },
    {
      "name": "NAME",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "table",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "scn",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "op_type",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "op_ts",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "current_ts",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "row_id",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "username",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

I am interested in the columns I and Name

nqwrtyyt

nqwrtyyt1#

这也可能是由于connection.url中的数据库名称与table.name.format中的数据库名称不匹配造成的。我最近遇到了这种情况,认为这可能会帮助其他人。

x8goxv8g

x8goxv8g2#

通过查看日志,我发现连接器在创建具有以下名称的表时出现问题:
[2022-01-07 23:56:11,737]正在检查PostgreSQL方言是否存在表“ORCLCDB”.“C__MYUSER”.“EMP”(io.confluent.connect.jdbc. dialect.GenericDatabaseDialect)[2022-01 - 07 23:56:11,764]正在使用PostgreSQL方言表“ORCLCDB”.“C__MYUSER”.“EMP”不存在(io. confluent.connect. jdbc. dialect. GenericDatabaseDialect)[2022 - 01 - 07 23:56:11,764]正在使用SQL方言创建表[2022-01-07 23:56:11,765]警告创建失败,如果表已经存在,将尝试修改(io.confluent. connect. jdbc.sink. DbStructure)。错误:未实现跨数据库引用:“我的用户”

svujldwt

svujldwt3#

Steven's comment上扩展
如果主题名称包含句号,也会发生这种情况
如果是table.name.format,则默认为${topicName}

相关问题