flink流式字数聚合

tgabmvqs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(369)

我碰到了一类问题,这些问题在批处理中是不存在的,但在流式处理的情况下似乎并不重要。让我们考虑一下经典的字数计算示例:

lines
  .flatMap(_.split("\\W+"))
  .map(word => (word, 1))
  .keyBy(0)
  .sum(1)

这将打印流中每个单词的结果,例如:

input: "foo bar baz foo"
output: (foo, 1) (bar, 1) (baz, 1) (foo, 2)

我想做的是将每一行作为一个整体进行处理,然后才打印结果,即在每一行上使用一个窗口:

input: "foo bar baz foo"
output: (foo, 2) (bar, 1) (baz, 1)

显然,基于时间和基于计数的窗口在这里都不适用。解决这个问题的正确方法是什么?

1cklez4t

1cklez4t1#

即使在批处理模式下,也无法并行处理字和行,因为嵌套 groupBy (或 keyBy )在flink中不支持。但是,如果您需要以下批字计数的流式版本:

lines
  .flatMap(line => (lineId,word,1))
  .groupBy(0)
  .reduceGroup {aggregateWords}

哪里 aggregateWords 迭代该特定键的单词并对它们进行计数,然后可以按以下方式实现:对于每一行,您在末尾发出单词和一个特殊记录,然后使用一个带有自定义触发器的globalwindow,该触发器在收到特殊记录后触发。
以前批处理作业的流式版本可能如下所示:

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements("foo bar baz foo", "yes no no yes", "hi hello hi hello")

            .flatMap(new FlatMapFunction<String, Tuple3<Double, String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple3<Double, String, Integer>> collector) throws Exception {
                    String[] words = s.split("\\W+");
                    Double lineId = Math.random();
                    for (String w : words) {
                        collector.collect(Tuple3.of(lineId, w, 1));
                    }
                    collector.collect(Tuple3.of(lineId, "\n", 1));
                }
            })
            .keyBy(0)
            .window(GlobalWindows.create())
            .trigger(new Trigger<Tuple3<Double, String, Integer>, GlobalWindow>() {
                @Override
                public TriggerResult onElement(Tuple3<Double, String, Integer> element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    if (element.f1.equals("\n")) {
                        return TriggerResult.FIRE;
                    }
                    return TriggerResult.CONTINUE;
                }

                @Override
                public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    return TriggerResult.CONTINUE;
                }

                @Override
                public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
                    return TriggerResult.CONTINUE;
                }
            })
            .fold(new HashMap<>(), new FoldFunction<Tuple3<Double, String, Integer>, HashMap<String, Integer>>() {
                @Override
                public HashMap<String, Integer> fold(HashMap<String, Integer> hashMap, Tuple3<Double, String, Integer> tuple3) throws Exception {
                    if (!tuple3.f1.equals("\n")) {
                        hashMap.put(tuple3.f1, hashMap.getOrDefault(tuple3.f1, 0) + 1);
                    }
                    return hashMap;
                }
            }).print();

    env.execute("Test");

}

输出:

{bar=1, foo=2, baz=1}
{no=2, yes=2}
{hi=2, hello=2}

相关问题