我正在尝试使用s3 sink kafka连接器将一些json数据存储到s3中。我的json格式如下:
{
"server": someserver,
"id": someid,
"time": "2018-01-18T23:47:03.737487Z"
}
我想根据数据所在的时间对数据进行分区,但忽略分和秒。例如,上述json将属于2018-01-18t23目录。如何在属性文件中设置field.partition来实现这一点?
谢谢!
我正在尝试使用s3 sink kafka连接器将一些json数据存储到s3中。我的json格式如下:
{
"server": someserver,
"id": someid,
"time": "2018-01-18T23:47:03.737487Z"
}
我想根据数据所在的时间对数据进行分区,但忽略分和秒。例如,上述json将属于2018-01-18t23目录。如何在属性文件中设置field.partition来实现这一点?
谢谢!
1条答案
按热度按时间xnifntxz1#
要实现confluent的s3连接器所描述的功能,一个粗略的方法是:
定义属性
timestamp.extractor
成为RecordField
,从记录上的字段中提取时间戳。设置属性
timestamp.field
记录字段的名称(time
在你的例子中)套
path.format
财产。这将允许您将文件存储到一小时,正如您在示例中提到的,忽略更精细的粒度(分钟、秒等)。同时设置
partition.duration.ms
为您提供有意义的粒度。重要的是-1
将不允许您使用基于时间的分区。最后,设置属性
locale
以及timezone
如果您使用的是预定义的分区器或自定义的基于时间的相关分区器,也可以这样做。请注意,连接器附带了一个预定义的基于时间的分区器类,您可能会发现它对您的用例很有用。您可以通过设置来使用它:
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner