处理来自jdbc接收器连接器中ksqldb流的tombstone记录

mcvgt66p  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(244)

因为ksqldb中的kstreams支持原始数据类型( String 或者 BigInt ),有没有办法处理来自kstreams的墓碑记录?
ValueToKey smt,可以转换为 PK 必须是mysql表/白名单列的一部分,不能被列入黑名单。有没有一种方法可以从一个列字段中获取值并将其Map到不同的键名?
它将有助于墓碑记录,直到关键的支持是没有释放合流。
例如-在ksql流中,

KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)

自从 KEY 格式为 STRING ,接收器连接器密钥格式必须为 STRING 而不是json/avro。在这个场景中,是否有任何方法可以处理墓碑记录?
我创建了两列 _KEY pk字段的后缀,以便我可以Map这些字段的值 _KEY 列到实际pk列并黑名单 _KEY 后缀列。

KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1_KEY=ABC, PK2_KEY=1, PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)

在Flume连接器,我用 ValueToKey smt,以便接收器连接器将根据中的值从mysql中删除记录 _KEY 为列添加后缀并采用原始列名,如-

delete from TEST where PK1='ABC' and PK2=1;
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "PK1,PK2",
"transforms": "ValueToKey,ReplaceField",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "PK1_KEY,PK2_KEY",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.whitelist": "PK1,PK2,COL3,COL4"

有办法做到这一点吗?当我尝试使用上述接收器配置时,接收器连接器中出现错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题