ApacheFlink:创建滞后数据流

bqujaahr  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(369)

我刚开始使用scala使用ApacheFlink。有人能告诉我如何从当前的数据流中创建一个滞后流(滞后k个事件或k个时间单位)?
基本上,我想在一个数据流上实现一个自动回归模型(流上的线性回归和它本身的时滞版本)。因此,需要一个类似于以下伪代码的方法。

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}

如果每个事件的间隔为1秒,并且有2秒的延迟,则我期望这样的示例输入和输出。
输入:1,2,3,4,5,6,7。。。
输出:na,na,1,2,3,4,5。。。

j2datikz

j2datikz1#

考虑到我对您的要求是正确的,我将作为一个 FlatMapFunction 使用fifo队列。队列缓冲区 k 事件并在新事件到达时发射头部。如果需要容错流应用程序,则必须将队列注册为状态。flink将负责检查状态(即队列),并在出现故障时恢复状态。
这个 FlatMapFunction 可能是这样的:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}

返回具有时滞的元素更为复杂。这将需要用于发射的计时器线程,因为只有在新元素到达时才调用该函数。

相关问题