flink:将带有csv头的元组写入文件

mkh04yzy  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(440)

我用flink(hadoop的1.7.1)做了一些数据处理。最后,我想把由两个元组组成的数据集写到一个文件中。目前,我是这样做的:

<Tuple2<Integer, Point>> pointsClustered = points.getClusteredPoints(...);
pointsClustered.writeAsCsv(params.get("output"), "\n", ",");

但是,我想有csv头写入第一行。flink的javadocapi没有为此声明任何选项。而且,我在谷歌上找不到任何解决方案。
你能就如何做到这一点提出建议吗。谢谢!

bvuwiixz

bvuwiixz1#

我可以通过使用union向数据集添加一个标题行来绕过这个限制。这样,第一行将始终是导出的标题行。

DataSet<Tuple8<String, String, String, String, String, String, String, String>> headers = env.fromElements(
            Tuple8.of(
                 "SDMId", "ActivityType", "ActionType", "ActivityId", "ActivityLevel", "Timestamp", "SessionId", "Value"
            ));

DataSet<Tuple8<String, String, String, String, String, String, String, String>> results =
            headers.union(skillResults);

results.writeAsCsv("file:///Users/karthsub/Desktop/first_export.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
kxxlusnw

kxxlusnw2#

Flink自己的 CsvOutputFormat 不支持此功能。你能做的就是延长 CsvOutputFormat 并覆盖 open 打开格式时写入头的方法。那你就用 DataSet#output 指定新创建的输出格式:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSource<Integer> input = env.fromElements(1, 2, 3);
    DataSet<Tuple3<Integer, String, Double>> result = input.map((MapFunction<Integer, Tuple3<Integer, String, Double>>) integer -> Tuple3.of(integer, integer.toString(), 42.0));

    Path outputPath = new Path("hdfs:///foobar");
    result.output(new MyCsvOutputFormat(outputPath));

    env.execute();
}

private static class MyCsvOutputFormat<T extends Tuple> extends CsvOutputFormat<T> {

    public MyCsvOutputFormat(Path outputPath) {
        super(outputPath);
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try (PrintWriter wrt = new PrintWriter(stream)) {
            wrt.println("Foo|bar|foobar");
        }
        super.open(taskNumber, numTasks);
    }
}

相关问题