我正在使用Flink S3接收器,并且我已经创建了自定义检查点滚动策略,以根据文件大小定义滚动条件。
根据下面的代码,我的S3部分文件大小不应该超过100KB的大小,但我看到文件大小高达2 MB在我的S3桶。这背后的原因可能是什么?我如何才能强制Flink输出文件大小不应该超过100KB的大小?
FileSink.forBulkFormat(s3SinkPath, new ParquetWriterFactory<>(builder))
.withBucketAssigner(new CustomDateTimeBucketAssigner())
.withRollingPolicy(new CustomCheckpointRollingPolicy())
.build();
public class CustomCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {
private static final long maxSize = 100_000; // 100KB
@Override
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
return partFileState.getSize() >= maxSize;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException {
return false;
}
}
1条答案
按热度按时间ars1skjm1#
您的自定义滚动策略不能按预期工作的原因是,实际上在您的情况下
partFileState.getSize()
始终为4。这样做的原因是
ParquetWriterFactory
在文件输出流周围创建了ParquetBulkWriter
的一个示例,并且它不公开内部parquetWriter
的实际缓冲区大小。但是,它返回输出流中的当前位置(始终为4),因为
ParquetWriter
不会立即将数据发送到输出流,而是存储在缓冲区中,并且仅当页大小/行组大小达到AvroParquetWriter
配置的目标值时才刷新。为了能够获得实际的数据大小(但是没有压缩,因此实际上与 parquet 文件大小不匹配),您需要提供自定义的
ParquetWriterFactory
,它将返回具有额外公开方法的自定义大小感知ParquetBulkWriter
并实现自定义的
BulkPartWriter
,它将从上面的编写器中获取数据大小(而不是parquet文件大小!):