I'm not able to perform SMT transformation "ExtractField" in order to extract field from key struct to a simple long value with an Oracle database. It works fine with a Postgres database.
I tried to use "ReplaceField" SMT to rename the key and it works fine. I suspect a problem in the class "org.apache.kafka.connect.transforms.ExtractField" on schema handling to get the field. Schema handling seems to work differently between "ReplaceField" and "ExtractField".
Oracle database version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.8.0.0.0 Debezium connect: 1.6 Kafka version: 2.7.0 Instanclient basic (Oracle client and drivers): 21.3.0.0.0
I got an "Unknown field ID_MYTABLE":
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: ***Unknown field: ID_MYTABLE*** org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more
Here is my configuration of my Kafka connector:
{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "serverName",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.url": "jdbc:oracle:thin:...",
"database.dbname": "dbName",
"database.pdb.name": "PDBName",
"database.connection.adapter": "logminer",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.data",
"schema.include.list": "mySchema",
"table.include.list": "mySchema.myTable",
"log.mining.strategy": "online_catalog",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap,route,extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractField.field": "ID_MYTABLE",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1_$2_$3"
}
}
1条答案
按热度按时间1mrurvl11#
默认情况下,当您为任何连接器(包括Debezium)配置SMT时,转换将应用于连接器发出的每个记录。这包括可能没有检索到的数据和必要字段的更改事件消息。要解决此问题,您需要选择性地将SMT应用于Debezium使用SMT predicate 生成的更改事件消息的特定子集。
官方文档位于here。
在您的特定情况下,您可以将SMT仅应用于该特定数据库表的输出主题,其外观如下所示:
如果主题名称匹配不起作用,上面列出的文档中还有其他 predicate 。