我有下面的图像在码头运行。
quay.io/debezium/connect:2.0
quay.io/debezium/kafka:2.0
quay.io/debezium/zookeeper:2.0
container-registry.oracle.com/database/enterprise:latest
debezium源连接器配置
"name": "customers-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"topic.prefix": "server1",
"database.hostname": "dbz_oracle21",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.server.name": "server1",
"table.include.list": "C##DBZUSER.CUSTOMERS",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.customers",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable":false,
"internal.value.converter.schemas.enable":false,
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
Oracle-JDBC-接收器连接器配置:
"name": "jdbc-sink-2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "CUSTOMERS",
"table.name.format": "kafka_customers",
"connection.url": "jdbc:oracle:thin:@dbz_oracle21:1521/orclpdb1",
"connection.user": "c##sinkuser",
"connection.password": "sinkpw",
"auto.create":true,
"auto.evolve":true,
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"pk.fields": "id",
"insert.mode":"insert",
"pk.mode": "record_key"
}
我可以看到疾控中心的事件被发表到Kafka的主题:“客户”。
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Key"},"payload":{"ID":1011}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int32","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Envelope","version":1},"payload":{"before":{"ID":1011,"NAME":"r 3"},"after":{"ID":1011,"NAME":"233"},"source":{"version":"2.0.1.Final","connector":"oracle","name":"server1","ts_ms":1674978001000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"schema":"C##DBZUSER","table":"CUSTOMERS","txId":"0a001b007a020000","scn":"3252353","commit_scn":"3252452","lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":1,"user_name":"C##DBZUSER"},"op":"u","ts_ms":1674978030086,"transaction":null}}
当我尝试使用接收器连接器配置接收来自主题名称“customers”的这些CDC事件时,我在连接器日志中看到错误消息-
tition CUSTOMERS-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2023-01-29 14:57:13,174 INFO || [Consumer clientId=connector-consumer-jdbc-sink-2-0, groupId=connect-jdbc-sink-2] Resetting offset for partition CUSTOMERS-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.17.0.3:9092 (id: 1 rack: null)], epoch=0}}. [org.apache.kafka.clients.consumer.internals.SubscriptionState]
2023-01-29 14:57:13,181 INFO || Attempting to open connection #1 to Oracle [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2023-01-29 14:57:13,222 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2023-01-29 14:57:13,263 ERROR || WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-01-29 14:57:13,263 ERROR || WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
2023-01-29 14:57:13,263 INFO || Stopping task [io.confluent.connect.jdbc.sink.JdbcSinkTask]
1条答案
按热度按时间tnkciper1#
Oracle是一个不区分大小写的系统,因此即使列名为
ID
,select id from tab
也可以工作。但大多数应用程序 * 引用 * 列名,所以配置
会得到类似
select "id" from tab
的结果这将触发错误
record key schema does not contain field: id
修复 * 案例 *。