Kafka 无法在Oracle数据库中使用SMT转换提取字段

pgky5nke  于 2023-03-07  发布在  Apache
关注(0)|答案(1)|浏览(132)

I'm not able to perform SMT transformation "ExtractField" in order to extract field from key struct to a simple long value with an Oracle database. It works fine with a Postgres database.
I tried to use "ReplaceField" SMT to rename the key and it works fine. I suspect a problem in the class "org.apache.kafka.connect.transforms.ExtractField" on schema handling to get the field. Schema handling seems to work differently between "ReplaceField" and "ExtractField".
Oracle database version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.8.0.0.0 Debezium connect: 1.6 Kafka version: 2.7.0 Instanclient basic (Oracle client and drivers): 21.3.0.0.0
I got an "Unknown field ID_MYTABLE":
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: ***Unknown field: ID_MYTABLE*** org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more
Here is my configuration of my Kafka connector:

{
  "name": "oracle-connector",  
  "config": {   
    "connector.class": "io.debezium.connector.oracle.OracleConnector", 
    "tasks.max": "1", 
    "database.server.name": "serverName", 
    "database.user": "c##dbzuser", 
    "database.password": "dbz", 
    "database.url": "jdbc:oracle:thin:...", 
    "database.dbname": "dbName", 
    "database.pdb.name": "PDBName", 
    "database.connection.adapter": "logminer", 
    "database.history.kafka.bootstrap.servers": "kafka:9092", 
    "database.history.kafka.topic": "schema-changes.data", 
    "schema.include.list": "mySchema", 
    "table.include.list": "mySchema.myTable", 
    "log.mining.strategy": "online_catalog", 
    "snapshot.mode": "initial", 
    "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
    "key.converter.schemas.enable": "false", 
    "value.converter": "io.confluent.connect.avro.AvroConverter", 
    "value.converter.schemas.enable": "true", 
    "value.converter.schema.registry.url": "http://schema-registry:8081", 
    "transforms": "unwrap,route,extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key", 
    "transforms.extractField.field": "ID_MYTABLE", 
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", 
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", 
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", 
    "transforms.route.replacement": "$1_$2_$3" 
  } 
}
1mrurvl1

1mrurvl11#

默认情况下,当您为任何连接器(包括Debezium)配置SMT时,转换将应用于连接器发出的每个记录。这包括可能没有检索到的数据和必要字段的更改事件消息。要解决此问题,您需要选择性地将SMT应用于Debezium使用SMT predicate 生成的更改事件消息的特定子集。
官方文档位于here
在您的特定情况下,您可以将SMT仅应用于该特定数据库表的输出主题,其外观如下所示:

# Create a predicate that matches your output 
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*

# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: ID_MYTABLE

# This references the predicate above
transforms.extractField.predicate: topicNameMatch

如果主题名称匹配不起作用,上面列出的文档中还有其他 predicate 。

相关问题