如何避免使用apache flink在word count中使用重复的键元组

x6492ojm  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(627)

我在学习apache flink时,正在尝试使用它编写一个简单的字数计算程序。
问题是我无法在结果中去掉重复的键元组。
输入:

a
aaa
ab
aaa
a
a

输出:

(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)

预期产量:

(a,3)
(aaa,2)
(ab, 1)

我的代码:

public static void main(String[] args) throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  DataStream<String> text = env.readTextFile("data.in");

  DataStream<Tuple2<String, Integer>> counts = text
    .map(s -> Tuple2.of(s, 1))
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .sum(1);

  counts.print();
  env.execute();
}
rur96b6h

rur96b6h1#

flink的流式api并不是为产生您期望的结果而设计的。相反,流处理背后的思想是,输入可能是无限的——换句话说,输入将持续地、永远地到达。在实践中,是的,输入可能会终止,但话说回来,可能不会。
因为flink并不期望流式输入会终止,所以不能期望它等到最后才产生结果。相反,flink的datastreamapi是围绕着产生连续结果的连续输入的思想来组织的。每个新的输入事件都可能产生一个更新的结果。
然而,有一种方法可以实现您想要的,同时仍然使用datastreamapi,但是它有点复杂。
结果是,当您将flink与有界输入源(如文件)一起使用时,当它到达有界输入的结尾时,将通过作业图发送一个信号,指示已到达结尾。实际上,你可以等待这个信号,然后才产生结果。
我说的这个信号实际上是一个水印,它的值是max\u水印。因此,您可以让processfunction为遥远的将来的某个时间点设置事件时间计时器。此计时器仅在出现此特殊水印时启动。同时,这个processfunction应该监视流,跟踪最新的结果(对于每个键)——当这个计时器在接收到这个非常大的水印后最终触发时,它将只收集到输出。
或者您可以只使用flink的dataset api,它是围绕批处理组织的。那你就会得到你想要的。

相关问题