作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我刚刚开始使用 Scala 来使用 Apache Flink。有人可以告诉我如何从我拥有的当前数据流创建滞后流(滞后于 k 个事件或 k 个时间单位)吗?
基本上,我想在数据流上实现一个自动回归模型(流上的线性回归及其自身的时间滞后版本)。因此,需要一个类似于以下伪代码的方法。
val ds : DataStream = ...
val laggedDS : DataStream = ds.map(lag _)
def lag(ds : DataStream, k : Time) : DataStream = {
}
如果每个事件的间隔为 1 秒并且有 2 秒的滞后,我希望示例输入和输出如下所示。
输入:1、2、3、4、5、6、7...
输出:NA、NA、1、2、3、4、5...
最佳答案
鉴于我的要求是正确的,我会将其实现为带有 FIFO 队列的 FlatMapFunction
。队列缓冲 k
个事件,并在新事件到达时发出头部。如果您需要容错流应用程序,则必须将队列注册为状态。然后,Flink 将负责检查状态(即队列)并在发生故障时恢复它。
FlatMapFunction
可能如下所示:
class Lagger(val k: Int)
extends FlatMapFunction[X, X]
with Checkpointed[mutable.Queue[X]]
{
var fifo: mutable.Queue[X] = new mutable.Queue[X]()
override def flatMap(value: X, out: Collector[X]): Unit = {
// add new element to queue
fifo.enqueue(value)
if (fifo.size == k + 1) {
// remove head element and emit
out.collect(fifo.dequeue())
}
}
// restore state
override def restoreState(state: mutable.Queue[X]) = { fifo = state }
// get state to checkpoint
override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo
}
返回具有时间滞后的元素更加复杂。这将需要计时器线程来进行发射,因为该函数仅在新元素到达时才会被调用。
关于scala - Apache 弗林克 : Creating a Lagged Datastream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39199358/
我是一名优秀的程序员,十分优秀!