Kafka连接-Debezium- Oracle JDBC接收器连接器错误使用Debezium(oracle)CDC事件

j5fpnvbx  于 2023-02-03  发布在  Apache
关注(0)|答案(1)|浏览(215)

我有下面的图像在码头运行。
quay.io/debezium/connect:2.0
quay.io/debezium/kafka:2.0
quay.io/debezium/zookeeper:2.0
container-registry.oracle.com/database/enterprise:latest

debezium源连接器配置

"name": "customers-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "topic.prefix": "server1",
    "database.hostname": "dbz_oracle21",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "dbz",
    "database.dbname": "ORCLCDB",
    "database.pdb.name": "ORCLPDB1",
    "database.server.name": "server1",
    "table.include.list": "C##DBZUSER.CUSTOMERS",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.customers",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "internal.key.converter.schemas.enable":false,
        "internal.value.converter.schemas.enable":false,
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"

Oracle-JDBC-接收器连接器配置:

"name": "jdbc-sink-2",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "CUSTOMERS",
        "table.name.format": "kafka_customers",
        "connection.url": "jdbc:oracle:thin:@dbz_oracle21:1521/orclpdb1",
        "connection.user": "c##sinkuser",
        "connection.password": "sinkpw",
        "auto.create":true,
        "auto.evolve":true,
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "pk.fields": "id",
        "insert.mode":"insert",
        "pk.mode": "record_key"
    }

我可以看到疾控中心的事件被发表到Kafka的主题:“客户”。

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Key"},"payload":{"ID":1011}}       {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int32","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Envelope","version":1},"payload":{"before":{"ID":1011,"NAME":"r 3"},"after":{"ID":1011,"NAME":"233"},"source":{"version":"2.0.1.Final","connector":"oracle","name":"server1","ts_ms":1674978001000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"schema":"C##DBZUSER","table":"CUSTOMERS","txId":"0a001b007a020000","scn":"3252353","commit_scn":"3252452","lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":1,"user_name":"C##DBZUSER"},"op":"u","ts_ms":1674978030086,"transaction":null}}

当我尝试使用接收器连接器配置接收来自主题名称“customers”的这些CDC事件时,我在连接器日志中看到错误消息-

tition CUSTOMERS-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2023-01-29 14:57:13,174 INFO   ||  [Consumer clientId=connector-consumer-jdbc-sink-2-0, groupId=connect-jdbc-sink-2] Resetting offset for partition CUSTOMERS-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.17.0.3:9092 (id: 1 rack: null)], epoch=0}}.   [org.apache.kafka.clients.consumer.internals.SubscriptionState]
2023-01-29 14:57:13,181 INFO   ||  Attempting to open connection #1 to Oracle   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2023-01-29 14:57:13,222 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2023-01-29 14:57:13,263 ERROR  ||  WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
2023-01-29 14:57:13,263 ERROR  ||  WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more
2023-01-29 14:57:13,263 INFO   ||  Stopping task   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
tnkciper

tnkciper1#

Oracle是一个不区分大小写的系统,因此即使列名为IDselect id from tab也可以工作。
但大多数应用程序 * 引用 * 列名,所以配置

"pk.fields": "id"

会得到类似select "id" from tab的结果
这将触发错误record key schema does not contain field: id
修复 * 案例 *。

相关问题