flink:基于一些计算,用一个新的列来丰富数据集

x33g5p2x  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(324)

我试图用一个数据集做一个简单的处理。
考虑一个具有两列类型的数据集 String . 我想在这个数据集中添加第三列 Long ,它累计到目前为止在数据集中看到的记录数。
例子:
输入:
a、 b类
b、 c级
c、 d级
输出:
a、 b,1号
b、 c,2个
c、 d,3个
我尝试了以下解决方案,但得到了一个奇怪的结果:

DataSet<Tuple2<String, String>> csvInput = env.readCsvFile("src/main/resources/data_file")
            .ignoreFirstLine()
            .includeFields("11")
            .types(String.class,String.class);

    long cnt=0;
    DataSet<Tuple3<String, String, Long>> csvOut2 = csvInput.map(new MyMapFunction(cnt));

private static class MyMapFunction implements MapFunction<Tuple2<String, String>, Tuple3<String, String, Long>> {

    long cnt;
    public MyMappingFunction(long cnt) {
        this.cnt = cnt;
    }

    @Override
    public Tuple3<String, String, Long> map(Tuple2<String, String> m) throws Exception {

        Tuple3 <String ,String, Long> resultTuple = new Tuple3(m.f0,m.f1, Long.valueOf(cnt));

        cnt++;
        return resultTuple;
    }
}

当我将此解决方案应用于一个包含100个条目的文件时,我得到的计数是47而不是100。计数器在53处重新启动。类似地,当我将它应用于一个更大的文件时,计数器会不时地重置,这样我就不会得到行的总数。
你能解释一下为什么我的实现是这样的吗?还有,有什么可能解决我的问题?
谢谢!

7z5jn7bk

7z5jn7bk1#

这是一个多线程问题。你有多少任务?
我必须在运行之前清理你的代码-我建议以后发布完整的工作示例,这样你就有机会得到更多的答案。
跟踪计数的方式不是线程安全的,因此如果有多个任务槽,则会出现计数值不准确的问题。
正确的计数方法,如dataartisans单词计数示例所示,是使用元组中的第3个槽来简单地存储值1,然后对数据集求和。

resultTuple = new Tuple3(m.f0,m.f1, 1L);

然后

csvOut2.sum(2).print();

其中2是包含值1的元组的索引。

相关问题