gpt4 book ai didi

scala - Spark mapWithState API 说明

转载 作者:行者123 更新时间:2023-12-03 03:55:58 24 4
gpt4 key购买 nike

我一直在 Spark Streaming 中使用 mapWithState API,但关于 StateSpec.function 有两件事不清楚:

假设我的功能是:

def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
  1. 为什么新值是 Option[T] 类型?据我所知,它总是为我定义的,并且由于该方法应该用新状态调用,所以我真的不明白为什么它可以是可选的。

  2. 返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途。由于我使用 state.remove()state.update() 修改键的状态,为什么我必须对返回值执行相同的操作?

    在我当前的实现中,如果删除 key ,我会返回 None;如果更新它,则会返回 Some(newState),但我不确定这是否正确.

最佳答案

Why is the new value an Option[T] type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

它是一个Option[T],因为如果您使用StateSpec.timeout设置超时,例如:

StateSpec.function(spec _).timeout(Milliseconds(5000))

那么函数超时后传入的值将为 None,并且 State[T] 上的 isTimingOut 方法将产生 true。这是有道理的,因为状态超时并不意味着指定键已到达新值,并且通常比为 T 传递 null 更安全(无论如何,这对基元不起作用),因为您希望用户安全地操作 Option[T]

您可以在 Sparks 实现中看到这一点:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}

What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?

返回值是沿着 Spark 图传递中间状态的一种方式。例如,假设我想要更新我的状态,但也使用中间数据在我的管道中执行一些操作,例如:

dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)

正是这个返回值让我能够继续对所述数据进行操作。如果您不关心中间结果,只想要完整的状态,那么输出 None 就完全没问题了。

编辑:

我写了一个blog post (在这个问题之后)它试图对 API 进行深入的解释。

关于scala - Spark mapWithState API 说明,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38397688/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com