apache flink-sum和keep分组

afdcj2ne  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(544)

假设我有这样的记录:

("a-b", "data1", 1)
("a-c", "data2", 1)
("a-b", "data3", 1)

如何在apache flink中进行分组和求和,从而得到以下结果?

("a-b", ["data1", "data3"], 2)
("a-c", ["data2"], 1)

你好,凯文

odopli94

odopli941#

我是在Flink炮弹里做到的( $FLINK_HOME/bin/start-scala-shell.sh local )代码如下:

import org.apache.flink.util.Collector
benv.
  fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3", 1)).
  groupBy(0).
  reduceGroup { 
    (it: Iterator[(String, String, Int)], out: Collector[(String, List[String], Int)]) => {
      // Watch out: if the group is _very_ large this can lead to OOM errors
      val group = it.toList
      // For all groups with at least one element (prevent out-of-bounds)
      if (group.length > 0)
        // Get the "name", all the elements and their third-column aggregate
        out.collect((group(0)._1, group.map(_._2), group.map(_._3).sum))
    }
  }.print

具有以下输出

(a-b,List(data1, data3),2)
(a-c,List(data2),1)

相关问题