我正在尝试使用flink流媒体来消费kafka主题消息,并创建(定期)将保存在s3上的Parquet文件。
在使用大容量格式的流文件接收器时,有没有办法将创建的部分文件名(或添加后缀/前缀)更改为比part-0-0或part-1-3更唯一?
StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"), ParquetAvroFactory.getParquetWriter(schema, CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();
1条答案
按热度按时间brvekthn1#
可以重写getbucketid方法(请参见https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigner.html)在bucketassigner上,这将影响路径,但显然不会影响零件文件名(请参见下面的注解)。
零件文件名在中的这段代码中建立
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
:似乎不是为了定制而设计的。