我用apache flink在hdfs上创建了一些归档数据文件,生成的文件名有part-{parallel task}-{count}这样的模式,但我期望的应该有“.gz”后缀,可以由apache spark直接加载。
我在ApacheFlink中找不到任何api来为bucketingsink生成的最终完成文件添加后缀,但只能为inprogress、pending和validlength状态添加后缀。有人能帮忙吗?hdfs连接器和javaapi
我用apache flink在hdfs上创建了一些归档数据文件,生成的文件名有part-{parallel task}-{count}这样的模式,但我期望的应该有“.gz”后缀,可以由apache spark直接加载。
我在ApacheFlink中找不到任何api来为bucketingsink生成的最终完成文件添加后缀,但只能为inprogress、pending和validlength状态添加后缀。有人能帮忙吗?hdfs连接器和javaapi
1条答案
按热度按时间c7rzv4ha1#
据我所知,没有使用默认bucketingsink添加后缀的选项。
一个选项是不使用检查点,并将挂起的后缀设置为所需的后缀。但由于检查点在大多数情况下都是可取的,所以这并不是最佳的。
我的解决方案是创建一个bucketingsinkwithsuffix实现,它几乎是默认bucketingsink的精确副本。唯一需要更改的是为后缀添加一个成员变量,后缀可以在构造函数中设置,并调整创建基路径的方式。
以下是我对构造函数的实现:
以及用于生成基本路径(第523和528行):