gpt4 book ai didi

apache-flink - 弗林克 : Stateful stream processing by key

转载 作者:行者123 更新时间:2023-12-04 04:48:32 24 4
gpt4 key购买 nike

我有数据流,例如带有 ID 的 JSON 记录。

我想处理数据,以便所有具有相同键的记录都由相同的有状态任务处理。

我怎样才能做到这一点?

最佳答案

这可以通过 KeyedStream 上的有状态运算符来完成。 .一个 KeyedStream将所有记录分区到一个键上,并确保所有具有相同键的记录进入同一个操作符实例并与相同的状态交互。

在代码中,这看起来像:

val stream: DataStream[(String, Long)] = ???
val sumByKey: DataStream[(String, Long)] = stream
.keyBy(_._1) // key on the first attribute
.map(new SumMapper())

class SumMapper extends RichMapFunction[(String, Long), (String, Long)] {

var sumState: ValueState[Long] = _

override def open(config: Configuration) {
// configure state
val sumDesc: ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("sum", classOf[Long])
sumState = getRuntimeContext.getState(sumDesc)
}

override def map(in: (String, Long)): (String, Long) = {
val sum = sumState.value() // get current sum from state
val newSum = sum + in._2 // compute new sum
sumState.update(newSum) // update state
(in._1, newSum) // emit result
}
}

关于apache-flink - 弗林克 : Stateful stream processing by key,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44919205/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com