我对使用apache-flink和scala还比较陌生,我刚刚掌握了一些基本功能。我在尝试实现一个定制时遇到了麻烦 WindowFunction
.
问题是当我尝试实现一个自定义 WindowFunction
ide在“.apply()”函数中出现错误
Cannot resolve symbol apply
Unspecified value parameters: foldFunction: (NotInferedR, Data.Fingerprint) => NotInferedR, windowFunction: (Tuple, TimeWindow, Iterable[NotInferedR], Collector[NotInferedR]) => Unit
Unspecified value parameters: foldFunction: FoldFunction[Data.Fingerprint, NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, TimeWindow]
Unspecified value parameters: function: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow]
Unspecified value parameters: windowFunction: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit
Type mismatch, expected: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit, actual: DataTimeWindow.DataWindow
Type mismatch, expected: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow], actual: DataTimeWindow.DataWindow
这是我的密码:
val test = hashMap
.keyBy("hash")
.timeWindow(Time.minutes(1))
.apply(new DataWindow())
这就是 WindowFunction
:
class DataWindow extends WindowFunction[Data.Fingerprint, String, String, TimeWindow] {
override def apply(key: String,
window: TimeWindow,
input: Iterable[Fingerprint],
out: Collector[String]) {
out.collect("helo")
}
}
1条答案
按热度按时间5rgfhyps1#
我想问题出在
WindowFunction
,即键的类型。因为你在keyBy
方法作为字符串(keyBy("hash")
),则无法在编译时确定键的类型。有两种解决方法:使用
KeySelector
中的函数keyBy
取出钥匙(例如keyBy(x: FingerPrint => x.hash)
). 的返回类型KeySelector
函数在编译时是已知的,所以可以使用类型化WindowFunction
.更改的第三个类型参数的类型
WindowFunction
至Tuple
.Tuple
是由提取的密钥的通用持有者keyBy
. 在你的情况下,这将是一个Tuple1
哈希字符串可以通过Tuple1.f0
.