My setup is based on docker containers - 1 oracle db, kafka, kafka connect and postgres. I first use oracle CDC connector to feed kafka which works fine. Then I am trying to read that topic and feed it into Postgres. When I start the connector I am getting:
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\norg.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "ORCLCDB.C__MYUSER.EMP"\n Position: 14\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\nCaused by: java.sql.SQLException: Exception chain:\norg.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "ORCLCDB.C__MYUSER.EMP"\n Position: 14\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)\n\t... 11 more\n"
My config json looks like :
{
"name": "SimplePostgresSink",
"config":{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"name": "SimplePostgresSink",
"tasks.max":1,
"topics": "ORCLCDB.C__MYUSER.EMP",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"confluent.topic.bootstrap.servers":"kafka:29092",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "I",
"auto.create": "true",
"auto.evolve": "true"
}
}
And the topic schema is:
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "I",
"type": {
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
},
{
"name": "NAME",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "scn",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "op_type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "op_ts",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "current_ts",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "row_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "username",
"type": [
"null",
"string"
],
"default": null
}
]
}
I am interested in the columns I and Name
3条答案
按热度按时间nqwrtyyt1#
这也可能是由于connection.url中的数据库名称与table.name.format中的数据库名称不匹配造成的。我最近遇到了这种情况,认为这可能会帮助其他人。
x8goxv8g2#
通过查看日志,我发现连接器在创建具有以下名称的表时出现问题:
[2022-01-07 23:56:11,737]正在检查PostgreSQL方言是否存在表“ORCLCDB”.“C__MYUSER”.“EMP”(io.confluent.connect.jdbc. dialect.GenericDatabaseDialect)[2022-01 - 07 23:56:11,764]正在使用PostgreSQL方言表“ORCLCDB”.“C__MYUSER”.“EMP”不存在(io. confluent.connect. jdbc. dialect. GenericDatabaseDialect)[2022 - 01 - 07 23:56:11,764]正在使用SQL方言创建表[2022-01-07 23:56:11,765]警告创建失败,如果表已经存在,将尝试修改(io.confluent. connect. jdbc.sink. DbStructure)。错误:未实现跨数据库引用:“我的用户”
svujldwt3#
在Steven's comment上扩展
如果主题名称包含句号,也会发生这种情况
如果是
table.name.format
,则默认为${topicName}