我试图深入了解flink中每个Slot内的数据,以了解数据是如何准确分布的。但对我来说,要知道确切的位置是什么真的很困惑。我正在使用一个带有小文本文件的单词计数示例,我想知道每个插槽中有什么数据,或者更具体地说,每个操作符示例将处理哪些数据,可以通过在该操作符或槽内打印数据。
这是我在本地环境中工作的代码:
ExecutionEnvironment ENV = ExecutionEnvironment.getExecutionEnvironment();
// ENV.setParallelism(Runtime.getRuntime().availableProcessors());
ENV.setParallelism(4);
DataSet<String> input1 = ENV.readTextFile(inputPathTesting);
DataSet<Tuple2<String,Integer>> wordTuples=input1
.flatMap(new Tokenizer());
wordTuples.writeAsText(outputPath);
ENV.execute("WordCount");
我仍然真的不知道flink中的数据分布是如何工作的,以及为什么有些接收器没有数据可写,而其他人有双倍的数据量。Anny建议或指南将有所帮助,提前感谢。所以我的目标是了解
注意:当我从文本文件中阅读时(这是4行,长度略有不同),如果我在smalles行中添加三个字母,这将改变结果中的数据分布。
更新:所以我到现在为止所能做的是:
DataSet<Tuple2<String, Integer>> textData =input
.filter(line -> !line.isEmpty())
.flatMap(new Tokenizer())
.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
int taskId = getRuntimeContext().getIndexOfThisSubtask();
return new Tuple2<>(value.f0, taskId);
}
});
和令牌化器
public static class Tokenizer extends RichMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
int taskSlotIndex = getRuntimeContext().getIndexOfThisSubtask();
taskSlotIndex++;
if (!value.isEmpty()) { // ignore empty lines
out.collect(new Tuple2<>(value, taskSlotIndex));
}
}
@Override
public Object map(Object o) throws Exception {
return null;
}
因此,使用RichMapFunction我可以访问RunTimeContext,因此子任务的索引,然后打印处理该行的TaskSlot的索引行。这种方式是正确的吗?是否有更好的方法来了解每个Slot中的数据?
1条答案
按热度按时间mqkwyuun1#
如果使用
print()
而不是writeAsText
,则每行输出都将以子任务索引为前缀。类似于: