Flink Checkpoint滚动策略未根据定义的文件大小限制滚动文件

vojdkbi0  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(318)

我正在使用Flink S3接收器,并且我已经创建了自定义检查点滚动策略,以根据文件大小定义滚动条件。
根据下面的代码,我的S3部分文件大小不应该超过100KB的大小,但我看到文件大小高达2 MB在我的S3桶。这背后的原因可能是什么?我如何才能强制Flink输出文件大小不应该超过100KB的大小?

FileSink.forBulkFormat(s3SinkPath, new ParquetWriterFactory<>(builder))
                    .withBucketAssigner(new CustomDateTimeBucketAssigner())
                    .withRollingPolicy(new CustomCheckpointRollingPolicy())
                    .build(); 



 public class CustomCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {

   

    private static final long maxSize = 100_000;  // 100KB

  

    @Override
    public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
        return partFileState.getSize() >= maxSize;
    }

    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException {
        return false;
    }
}
ars1skjm

ars1skjm1#

您的自定义滚动策略不能按预期工作的原因是,实际上在您的情况下partFileState.getSize()始终为4。
这样做的原因是ParquetWriterFactory在文件输出流周围创建了ParquetBulkWriter的一个示例,并且它不公开内部parquetWriter的实际缓冲区大小。

@Override
    public long getSize() throws IOException {
        return currentPartStream.getPos();
    }

但是,它返回输出流中的当前位置(始终为4),因为ParquetWriter不会立即将数据发送到输出流,而是存储在缓冲区中,并且仅当页大小/行组大小达到AvroParquetWriter配置的目标值时才刷新。
为了能够获得实际的数据大小(但是没有压缩,因此实际上与 parquet 文件大小不匹配),您需要提供自定义的ParquetWriterFactory,它将返回具有额外公开方法的自定义大小感知ParquetBulkWriter

/**
     * @return the total size of data written to the file and buffered in memory
     */
    public Long getSize() {
        return this.parquetWriter.getDataSize();
    }

并实现自定义的BulkPartWriter,它将从上面的编写器中获取数据大小(而不是parquet文件大小!):

@Override
    public long getSize() throws IOException {
        return ((SizeAwareParquetBulkWriter) writer).getSize();
    }

相关问题