ApacheFlink—利用并行性生成有序的窗口聚合(即前10个查询)

3ks5zfa0  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(426)

我试图利用并行性来加速前10个窗口操作。我的应用程序由具有时间戳和密钥的事件和(即。, Tuple2<Long,String> )我的目标是为30分钟的滚动窗口(使用事件时间)生成前10个最频繁的键。为此,我的查询由入口、窗口和聚合阶段组成。换句话说,我的代码需要执行以下操作:

DataStream<Tuple3<Long, String, Integer>> s = env
    .readTextFile("data.csv")
    .map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
      @Override
      public Tuple3<Long, String, Integer> map(String s) throws Exception {
        String[] tokens = s.split(",");
        return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
            tokens[1], 1);
      }})
    .assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
          @Override
          public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
            return t.f0;
          }}).setParallelism(1);

上面是解析csv文件中的数据并分配事件时间(即入口)的代码。之所以将parallelism设置为1,是因为我需要事件按顺序显示,以便将它们分配给windows。
接下来是棘手的部分,我尝试在生成正确(有序)的窗口结果的同时加快执行速度。
原始(串行)执行
以下代码提供了一个不使用任何并行性并生成串行流的解决方案:

DataStream<Tuple2<Long, String>> windowedTopTen = s
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(30)))
        .apply(new SerialAggregation()).setParallelism(1);

哪里 SerialAggregation 延伸 RichAllWindowFunction<Tuple3<Long, String, Integer>, Tuple2<Long, String>, TimeWindow> 每扇翻滚的Windows Tuple2<Long, String> ( Long 是时间戳和 String 包含前10个键)。
naive方法生成正确的结果,结果数据流按时间戳升序排序。不幸的是,它没有利用多线程,因此当输入数据是一些gbs时,执行需要一段时间才能完成。
并行(更快)进近
在查看了flink在windows上的文档之后,我试图通过使用 parallelism > 1 同时为每个窗口生成正确的结果。因此,我看到我需要转变 sKeyedStream 然后应用 window() 转变。本质上:

DataStream<Tuple2<Long, String>> windowedTopTen = s
    .keyBy(1)
    .window(TumblingEventTimeWindows.of(Time.minutes(30)))
    .apply(new PartialAggregation()).setParallelism(N);

哪里 PartialAggregation() 将为不同的时间戳生成部分结果(不相交的键集)。换句话说,我的理解是,对于相同的时间戳 t1 我会以 partial_result_1partial_result_N 哪里 N 是我设定的平行度。我的目标是聚合特定时间戳的所有部分结果(如 t1 ),但我不知道怎么做。另外,当我能够将部分结果与匹配的时间戳相结合时,我将如何生成一个数据流,它的元组是基于时间戳排序的(就像原始解决方案生成的结果一样)。
问题
如何完成并行(更快)方法以生成所需的结果并将部分结果与匹配的时间戳相结合?
在我为每个时间戳合并部分结果之后,有没有一种方法可以生成一个数据流,其中的结果根据时间戳排序?

h43kikqp

h43kikqp1#

首先,如果将tuple2替换为tuple3,其中字符串是单个键,整数是计数器,那么将部分前10个结果合并到整体前10个结果会更容易。
然后,您可以使用windowall和聚集窗口函数添加第二层窗口,该函数保留前10个键(总键数)及其计数。

相关问题