Kafka接收器连接器能否将记录时间戳作为有效负载存储在存储器中

hs1ihplo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(378)

我同时使用s3和jdbc sink连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的希望将kafka摄取时间或记录生成时间保存到存储在sink系统中的数据中。
我在看文件,但没有找到这个。我使用的是汇合连接器,但如果它允许我这样做的话,我也可以使用其他连接器,比如camel。
有人能给我点建议吗?
更新:根据一位板球运动员的良好反馈,我明白我应该看看这个:https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
我还看到了这个例子:kafka连接消费者引用偏移量并存储在消息中
我将测试它,但我是否正确理解,例如,理论上我可以这样做:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"

这将在记录中为我创建3个新属性,名为recordoffset、recordpartition和recordtimestamp,其中包含所描述的值。
如果我想确保值始终存在或失败,我需要做(不确定我是否理解后缀部分):

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
cyvaqqii

cyvaqqii1#

作为 @OneCricketeer 他说 InsertField 单消息转换在这里完成工作。下面是使用它的s3接收器配置示例:

{
          "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
          "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
          "s3.region"              : "us-west-2",
          "s3.bucket.name"         : "rmoff-smt-demo-01",
          "topics"                 : "customers,transactions",
          "tasks.max"              : "4",
          "flush.size"             : "16",
          "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility"   : "NONE",
          "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
          "transforms"                          : "insertTS,formatTS",
          "transforms.insertTS.type"            : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field" : "messageTS",
          "transforms.formatTS.type"            : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.formatTS.format"          : "yyyy-MM-dd HH:mm:ss:SSS",
          "transforms.formatTS.field"           : "messageTS",
          "transforms.formatTS.target.type"     : "string"        
        }

请注意,它还使用 TimestampConverter 在字符串中格式化时间戳-默认情况下是epoch。
你的问题促使我正确地写了这篇文章,并记录了一个小教程-你可以在这里看到:https://youtu.be/3gj_soyutyk

相关问题