我在学习apache flink时,正在尝试使用它编写一个简单的字数计算程序。
问题是我无法在结果中去掉重复的键元组。
输入:
a
aaa
ab
aaa
a
a
输出:
(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)
预期产量:
(a,3)
(aaa,2)
(ab, 1)
我的代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("data.in");
DataStream<Tuple2<String, Integer>> counts = text
.map(s -> Tuple2.of(s, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);
counts.print();
env.execute();
}
1条答案
按热度按时间rur96b6h1#
flink的流式api并不是为产生您期望的结果而设计的。相反,流处理背后的思想是,输入可能是无限的——换句话说,输入将持续地、永远地到达。在实践中,是的,输入可能会终止,但话说回来,可能不会。
因为flink并不期望流式输入会终止,所以不能期望它等到最后才产生结果。相反,flink的datastreamapi是围绕着产生连续结果的连续输入的思想来组织的。每个新的输入事件都可能产生一个更新的结果。
然而,有一种方法可以实现您想要的,同时仍然使用datastreamapi,但是它有点复杂。
结果是,当您将flink与有界输入源(如文件)一起使用时,当它到达有界输入的结尾时,将通过作业图发送一个信号,指示已到达结尾。实际上,你可以等待这个信号,然后才产生结果。
我说的这个信号实际上是一个水印,它的值是max\u水印。因此,您可以让processfunction为遥远的将来的某个时间点设置事件时间计时器。此计时器仅在出现此特殊水印时启动。同时,这个processfunction应该监视流,跟踪最新的结果(对于每个键)——当这个计时器在接收到这个非常大的水印后最终触发时,它将只收集到输出。
或者您可以只使用flink的dataset api,它是围绕批处理组织的。那你就会得到你想要的。