我同时使用s3和jdbc sink连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的希望将kafka摄取时间或记录生成时间保存到存储在sink系统中的数据中。
我在看文件,但没有找到这个。我使用的是汇合连接器,但如果它允许我这样做的话,我也可以使用其他连接器,比如camel。
有人能给我点建议吗?
更新:根据一位板球运动员的良好反馈,我明白我应该看看这个:https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
我还看到了这个例子:kafka连接消费者引用偏移量并存储在消息中
我将测试它,但我是否正确理解,例如,理论上我可以这样做:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"
这将在记录中为我创建3个新属性,名为recordoffset、recordpartition和recordtimestamp,其中包含所描述的值。
如果我想确保值始终存在或失败,我需要做(不确定我是否理解后缀部分):
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
1条答案
按热度按时间cyvaqqii1#
作为
@OneCricketeer
他说InsertField
单消息转换在这里完成工作。下面是使用它的s3接收器配置示例:请注意,它还使用
TimestampConverter
在字符串中格式化时间戳-默认情况下是epoch。你的问题促使我正确地写了这篇文章,并记录了一个小教程-你可以在这里看到:https://youtu.be/3gj_soyutyk