如何在kafka connect smt中重命名/替换结构中的字段?

oyjwcjzk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(453)

replacefield smt的描述说它可以 Filter or rename fields within a Struct or Map. 但是,我找不到任何在结构中替换或重命名字段的工作示例。
我用kafka connect elasticsearch接收器将一个主题中的数据写入elasticsearch。为简单起见,假设数据的格式如下所示。

{
  'ID':22, 
  'ITEM': 'Shampoo'
  'USER':{
    'NAME': 'jon', 
    'AGE':25
   }
}

所以如果我想重新命名/替换 USER.NAME 或者 USER.AGE ,如何在连接器中配置(我已经用ksqldb写了所有的东西)。这是我的当前配置,我在其中重命名 ITEMproduct 以及 IDid ```
CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);

ix0qys7i

ix0qys7i1#

看看现有的so问答:https://stackoverflow.com/a/56601093/4778022
您可以提供要重命名的字段的路径,并用句点分隔部分。

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url' = 'http://host.docker.internal:9200',
    'type.name' = '_doc',
    'topics' = 'ELASTIC_TOPIC',
    'key.ignore' = 'false',
    'schema.ignore' = 'true',
    'transforms' = 'RenameField',
    'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
);

相关问题