在flink中使用fold函数时出现scala错误

uelo1irk  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(682)

代码如下:

env
  .addSource(...)
  .map(r => (0, r))
  .keyBy(0)
  .timeWindow(Time.seconds(30), Time.seconds(1))
  .fold(mutable.HashSet[String](),(a:(Int,String),b:mutable.HashSet[String])=>a)

编译时出错,错误消息为:
错误:类windowedstream中缺少方法fold的参数;如果要将其视为部分应用的函数timewindow(time.seconds(30),time.seconds(1)).fold(mutable.hashsetstring,
但windowedstream类中定义的函数是:
公共折叠(r initialvalue,foldfunction)

2izufjch

2izufjch1#

问题有两个方面:首先是 fold 函数需要 FoldFunction 如果使用scala,则在第二个参数列表中传递。第二,第一个参数 FoldFunction 应为聚合类型。因此,在您的情况下,它应该是 mutable.HashSet[String] . 下面的代码片段应该可以做到这一点:

env
  .addSource(...)
  .map(r => (0, r))
  .keyBy(0)
  .timeWindow(Time.seconds(30), Time.seconds(1))
  .fold(mutable.HashSet[String]()){
    (a: mutable HashSet[String], b: (Int, String)) => a
  }

注意Flink的 fold api调用已弃用。现在建议使用 aggregate api调用。

相关问题