我刚开始使用scala使用ApacheFlink。有人能告诉我如何从当前的数据流中创建一个滞后流(滞后k个事件或k个时间单位)?
基本上,我想在一个数据流上实现一个自动回归模型(流上的线性回归和它本身的时滞版本)。因此,需要一个类似于以下伪代码的方法。
val ds : DataStream = ...
val laggedDS : DataStream = ds.map(lag _)
def lag(ds : DataStream, k : Time) : DataStream = {
}
如果每个事件的间隔为1秒,并且有2秒的延迟,则我期望这样的示例输入和输出。
输入:1,2,3,4,5,6,7。。。
输出:na,na,1,2,3,4,5。。。
1条答案
按热度按时间j2datikz1#
考虑到我对您的要求是正确的,我将作为一个
FlatMapFunction
使用fifo队列。队列缓冲区k
事件并在新事件到达时发射头部。如果需要容错流应用程序,则必须将队列注册为状态。flink将负责检查状态(即队列),并在出现故障时恢复状态。这个
FlatMapFunction
可能是这样的:返回具有时滞的元素更为复杂。这将需要用于发射的计时器线程,因为只有在新元素到达时才调用该函数。