Apache Flink:创建滞后数据流

Apache Flink : Creating a Lagged Datastream

我刚开始使用 Scala 的 Apache Flink。有人可以告诉我如何从我拥有的当前数据流创建滞后流(滞后 k 个事件或 k 个时间单位)吗?

基本上,我想在数据流上实现一个自动回归模型(流上的线性回归及其自身的时间滞后版本)。因此,需要一种类似于以下伪代码的方法。

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}

如果每个事件间隔为 1 秒并且有 2 秒滞后,我希望样本输入和输出像这样。

输入:1、2、3、4、5、6、7...
输出:NA, NA, 1, 2, 3, 4, 5...

鉴于我的要求是正确的,我会将其实现为带有 FIFO 队列的 FlatMapFunction。队列缓冲 k 个事件并在新事件到达时发出头部。如果您需要容错流应用程序,队列必须注册为状态。然后 Flink 将负责检查状态(即队列)并在发生故障时恢复它。

FlatMapFunction 可能如下所示:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}

返回具有时间延迟的元素更加复杂。这将需要计时器线程进行发射,因为该函数仅在新元素到达时调用。