我可以更改Flink Streaming File Sink的输出路径格式吗?

ujv3wf0j  于 2023-02-01  发布在  Apache
关注(0)|答案(1)|浏览(143)
    • bounty将在7天后过期**。回答此问题可获得+50声望奖励。Rinze希望引起更多人关注此问题。

我使用Pyflink和Streaming API将数据同步到文件系统中,输出文件的路径如下:

-2023-01-28--01
 |-part-xxx-0.json
-2023-01-28--03
 |-part-xxx-0.json

输出文件的路径格式好像是{year}-{month}-{day}--{hour}/part-xxx-{commit}.json,我怎样才能把路径格式改成{year}/{month}/{day}/{hour}/part-xxx-{commit}.json这样的格式呢?

yhived7q

yhived7q1#

编写一个自定义类来扩展DateTimeBucketAssigner并覆盖getBucketId方法中的路径生成逻辑
下面是一个示例-保存到一个带有前缀的路径作为POJO类名:

public class DateTimeWithClassPrefixBucketAssigner<IN> extends DateTimeBucketAssigner {
....
    @Override
    public String getBucketId(Object element, Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        }
        String prefix = element.getClass().getSimpleName();
        return prefix + "/" + dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
    }
}

结束转换数据格式

import java.text.SimpleDateFormat;
...
    String input = "2022-01-31--10";
    String output = new SimpleDateFormat("{year}/{month}/{day}/{hour}/part-xxx.json").format(
            new SimpleDateFormat("yyyy-MM-dd--HH").parse(input)

相关问题