flink流-使用StreamingFileLink时更改部分文件名?

oug3syen  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(625)

我正在尝试使用flink流媒体来消费kafka主题消息,并创建(定期)将保存在s3上的Parquet文件。
在使用大容量格式的流文件接收器时,有没有办法将创建的部分文件名(或添加后缀/前缀)更改为比part-0-0或part-1-3更唯一?

StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"),               ParquetAvroFactory.getParquetWriter(schema,  CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();
brvekthn

brvekthn1#

可以重写getbucketid方法(请参见https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigner.html)在bucketassigner上,这将影响路径,但显然不会影响零件文件名(请参见下面的注解)。
零件文件名在中的这段代码中建立 org.apache.flink.streaming.api.functions.sink.filesystem.Bucket :

private Path assembleNewPartPath() {
    return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}

似乎不是为了定制而设计的。

相关问题