不能使用windowstream.apply()函数应用windowfunction

qij5mzcb  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(365)

我对使用apache-flink和scala还比较陌生,我刚刚掌握了一些基本功能。我在尝试实现一个定制时遇到了麻烦 WindowFunction .
问题是当我尝试实现一个自定义 WindowFunction ide在“.apply()”函数中出现错误

Cannot resolve symbol apply

Unspecified value parameters: foldFunction: (NotInferedR, Data.Fingerprint) => NotInferedR, windowFunction: (Tuple, TimeWindow, Iterable[NotInferedR], Collector[NotInferedR]) => Unit 
Unspecified value parameters: foldFunction: FoldFunction[Data.Fingerprint, NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, TimeWindow] 
Unspecified value parameters: function: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow] 
Unspecified value parameters: windowFunction: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit 

Type mismatch, expected: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit, actual: DataTimeWindow.DataWindow 
Type mismatch, expected: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow], actual: DataTimeWindow.DataWindow

这是我的密码:

val test = hashMap
      .keyBy("hash")
      .timeWindow(Time.minutes(1))
      .apply(new DataWindow())

这就是 WindowFunction :

class DataWindow extends WindowFunction[Data.Fingerprint, String, String, TimeWindow]  {

    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[Fingerprint],
                       out: Collector[String]) {

      out.collect("helo")
    }
  }
5rgfhyps

5rgfhyps1#

我想问题出在 WindowFunction ,即键的类型。因为你在 keyBy 方法作为字符串( keyBy("hash") ),则无法在编译时确定键的类型。有两种解决方法:
使用 KeySelector 中的函数 keyBy 取出钥匙(例如 keyBy(x: FingerPrint => x.hash) ). 的返回类型 KeySelector 函数在编译时是已知的,所以可以使用类型化 WindowFunction .
更改的第三个类型参数的类型 WindowFunctionTuple . Tuple 是由提取的密钥的通用持有者 keyBy . 在你的情况下,这将是一个 Tuple1 哈希字符串可以通过 Tuple1.f0 .

相关问题