Flink 如何了解每个插槽或Operator示例中的数据?

voj3qocg  于 2023-04-10  发布在  Apache
关注(0)|答案(1)|浏览(124)

我试图深入了解flink中每个Slot内的数据,以了解数据是如何准确分布的。但对我来说,要知道确切的位置是什么真的很困惑。我正在使用一个带有小文本文件的单词计数示例,我想知道每个插槽中有什么数据,或者更具体地说,每个操作符示例将处理哪些数据,可以通过在该操作符或槽内打印数据。
这是我在本地环境中工作的代码:

ExecutionEnvironment ENV = ExecutionEnvironment.getExecutionEnvironment();
//        ENV.setParallelism(Runtime.getRuntime().availableProcessors());
        ENV.setParallelism(4);
        DataSet<String> input1 = ENV.readTextFile(inputPathTesting);
        DataSet<Tuple2<String,Integer>> wordTuples=input1
                .flatMap(new Tokenizer());
        wordTuples.writeAsText(outputPath);

        ENV.execute("WordCount");

我仍然真的不知道flink中的数据分布是如何工作的,以及为什么有些接收器没有数据可写,而其他人有双倍的数据量。Anny建议或指南将有所帮助,提前感谢。所以我的目标是了解
注意:当我从文本文件中阅读时(这是4行,长度略有不同),如果我在smalles行中添加三个字母,这将改变结果中的数据分布。
更新:所以我到现在为止所能做的是:

DataSet<Tuple2<String, Integer>> textData =input
            .filter(line -> !line.isEmpty()) 
            .flatMap(new Tokenizer())
            .map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
                    int taskId = getRuntimeContext().getIndexOfThisSubtask();
                    return new Tuple2<>(value.f0, taskId);
                }
            });

和令牌化器

public static class Tokenizer extends RichMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        int taskSlotIndex = getRuntimeContext().getIndexOfThisSubtask();
        taskSlotIndex++;
        if (!value.isEmpty()) { // ignore empty lines
            out.collect(new Tuple2<>(value, taskSlotIndex));
        }
    }

    @Override
    public Object map(Object o) throws Exception {
        return null;
    }

因此,使用RichMapFunction我可以访问RunTimeContext,因此子任务的索引,然后打印处理该行的TaskSlot的索引行。这种方式是正确的吗?是否有更好的方法来了解每个Slot中的数据?

mqkwyuun

mqkwyuun1#

如果使用print()而不是writeAsText,则每行输出都将以子任务索引为前缀。类似于:

2> (1577883600000,2013000185,33.0)
4> (1577883600000,2013000108,14.0)
3> (1577883600000,2013000087,14.0)
1> (1577883600000,2013000036,23.0)
4> (1577883600000,2013000072,13.0)
2> (1577883600000,2013000041,28.0)
3> (1577883600000,2013000123,33.0)
4> (1577883600000,2013000188,18.0)
1> (1577883600000,2013000098,23.0)
2> (1577883600000,2013000047,13.0)
...

相关问题