我相信,当连续两次劈开时,Flink的行为很奇怪。我可能在我的实现逻辑中有一些错误,这就是为什么我在这里发帖询问您的意见。
最简单的例子:我有一个文本文件,包含苹果,香蕉和橙色的话。我在流执行环境中将其作为源代码传递。我做了第一次拆分,其中select条件是如果参数是单词“apple”。如果是的话,我就把它放在“主题”苹果上,否则就放在“主题”苹果上。然后我在这个分割流中选择“topic”notapples并再次分割它,但是这次条件检查参数是否是单词“orange”。如果是,则放在“主题”橙色区域,否则放在“主题”橙色区域。
最后,当我打印最后一个分割流的主题时,我期望的是只打印“banana”一词。然而,我实际上打印的是“苹果”和“香蕉”两个字。我注意到,当第二次拆分完成时,处理它的流不是只包含我从中选择的主题元素(即notapples)的流,而是包含所有元素的流。我错过什么了吗?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Apple")) {
output.add("Apples");
} else {
output.add("NotApples");
}
return output;
}
});
DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Orange")) {
output.add("Oranges");
} else {
output.add("NotOranges");
}
return output;
}
});
DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");
输出:
1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple
注意:我知道我可以使用一个split来实现相同的逻辑(在这个逻辑中,我检查参数是“apple”还是“organge”)。然而,这不是我的问题的重点。我最初在我编写的一个更复杂的程序中注意到这种行为,在这个程序中需要两个consequetive split,所以我决定尝试用一个最小的示例来重新创建它,以检查是否可以重新生成它。
2条答案
按热度按时间6ojccjat1#
最近讨论了邮件列表中的这种不正确行为,主题是“关于为datastream api弃用split/select”。我认为关键的评论是:
首先,我们必须承认当前split/select的实现是有缺陷的。我大致浏览了源代码,问题可能是对于连续的select/split(s),在streamgraph生成阶段,前者将被后者覆盖。这就是为什么我们在flink-11084中禁止这种连续逻辑。
在查看了flink-11084和由此产生的补丁之后,我相信如果您连续执行两次剥离/选择,flink的最新版本将抛出一个异常。
a0zr77ik2#
鉴于我对split/select是如何实现的所知,如果这不起作用,我也不会感到惊讶(尽管我还不太清楚)。此外,split/select最近被弃用(尽管还不清楚它是否真的会消失)。
分割/选择的更好方法是通过侧输出。这是一个更强大的机制,实现更简洁。