使用kafkaconnect将数据摄取到s3时如何基于json字段的一部分进行分区

jvlzgdj9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(312)

我正在尝试使用s3 sink kafka连接器将一些json数据存储到s3中。我的json格式如下:

{
   "server": someserver,
   "id": someid,
   "time": "2018-01-18T23:47:03.737487Z"
}

我想根据数据所在的时间对数据进行分区,但忽略分和秒。例如,上述json将属于2018-01-18t23目录。如何在属性文件中设置field.partition来实现这一点?
谢谢!

xnifntxz

xnifntxz1#

要实现confluent的s3连接器所描述的功能,一个粗略的方法是:
定义属性 timestamp.extractor 成为 RecordField ,从记录上的字段中提取时间戳。
设置属性 timestamp.field 记录字段的名称( time 在你的例子中)
path.format 财产。这将允许您将文件存储到一小时,正如您在示例中提到的,忽略更精细的粒度(分钟、秒等)。
同时设置 partition.duration.ms 为您提供有意义的粒度。重要的是 -1 将不允许您使用基于时间的分区。
最后,设置属性 locale 以及 timezone 如果您使用的是预定义的分区器或自定义的基于时间的相关分区器,也可以这样做。
请注意,连接器附带了一个预定义的基于时间的分区器类,您可能会发现它对您的用例很有用。您可以通过设置来使用它: partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner

相关问题