gpt4 book ai didi

scala - Spark mapWithState 更新状态输出

转载 作者:行者123 更新时间:2023-12-01 13:43:50 31 4
gpt4 key购买 nike

升级到 Spark 1.6.1 后,我开始重构应用程序以将 updateStateByKey 替换为 mapWithState

为了利用新 API 的性能优势,我不想调用加载所有状态的 stateSnapshots。我只想要更新的状态。

mapWithState API 返回一个 [key, input, state, output]DStream,其中每个状态都是经过部分更新后的状态输入被摄取。我如何从这个 DStream 中单独提取最新状态(即所有相应输入被摄取/映射后的状态)?

我可以在 MapWithStateDStream 上执行 map(删除输入和输出)和 reduceByKey,选择具有较新时间戳的状态(这是我在更新函数中设置的),但我不能保证不会有两个具有相同时间戳的部分状态,即使使用自定义的按键分区程序也是如此。

如何判断 mapWithStateMapWithStateDStream 输出中哪个部分状态是最新的?

最佳答案

mapWithState 只会为当前微批中正在更新的每个状态调用。实现您想要的目标的一种方法是在状态已更新的情况下返回 Some[S]

StateSpec.function 采用具有以下签名的方法:

mappingFunction: 
(Time, KeyType, Option[ValueType], State[StateType]) => Option[MappedType]

我们能做的是确保我们的 Option[MappedType] 在值更新时始终是 Some[MappedType],否则 None.

例如:

def updateState(key: Int, value: Option[Int], state: State[Int]): Option[Int] = {
value match {
case Some(something) if something > 10 =>
val updatedVal = something * something
state.update(updatedVal)
Some(updatedVal)
case _ => None
}
}

然后你可以做:

val spec = StateSpec.function(updateState _)
ssc.mapWithState(spec).filter(!_.isEmpty).foreachRDD(/* do stuff on updated state */)

通过这种方式,您可以过滤掉任何未更新的状态,只保留您正在查找的更新快照。

关于scala - Spark mapWithState 更新状态输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37347169/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com