我有一个s3接收器连接器用于多个主题(topic\u a,topic\u b,topic\u c),topic\u a有字段created\u date,topic\u b,topic\u c有created\u date。我用过下面的 transforms.RenameField.renames
重命名字段(已创建)_date:creation_date)但是,由于只有一个主题创建了日期,而其他主题没有,所以连接器出现故障。
我想将所有消息(来自带有单个连接器的所有主题)移动到带有creation\u date的s3中(如果存在,则将created\u date重命名为creation\u date),但我无法找出regex或transformer来重命名特定主题的字段(如果存在)。
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages":"true",
"s3.region":"eu-west-1",
"topics.dir":"dir",
"flush.size":"5",
"tasks.max":"2",
"s3.part.size":"5242880",
"timezone":"UTC",
"locale":"en",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"errors.log.enable":"true",
"s3.bucket.name":"bucket",
"topics": "topic_a, topic_b, topic_c",
"s3.compression.type":"gzip",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"name":"NAME",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"key.converter.schemas.enable":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"true",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"https://schemaregistry.com",
"enhanced.avro.schema.support": "true",
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "created_date:creation_date"
}
1条答案
按热度按时间62lalag41#
只有主题a创建了日期,其他人没有,
然后你会使用不同的连接器。一个带有变换,所有主题带有字段,然后另一个没有变换。
从带有单个连接器的所有主题
这个比例不太好。你正在制作有限的用户线程和一个用户组来同时阅读许多主题。多个连接器将更好地分配负载。