我使用Pyflink和Streaming API将数据同步到文件系统中,输出文件的路径如下:
-2023-01-28--01
|-part-xxx-0.json
-2023-01-28--03
|-part-xxx-0.json
输出文件的路径格式好像是{year}-{month}-{day}--{hour}/part-xxx-{commit}.json
,我怎样才能把路径格式改成{year}/{month}/{day}/{hour}/part-xxx-{commit}.json
这样的格式呢?
1条答案
按热度按时间yhived7q1#
编写一个自定义类来扩展DateTimeBucketAssigner并覆盖getBucketId方法中的路径生成逻辑
下面是一个示例-保存到一个带有前缀的路径作为POJO类名:
结束转换数据格式