gpt4 book ai didi

apache-spark - Spark 有状态结构化流 : State getting too big in mapGroupsWithState

转载 作者:行者123 更新时间:2023-12-04 14:11:40 26 4
gpt4 key购买 nike

我正在尝试使用 mapGroupsWithState 用于传入数据流的有状态结构化流的方法。但我面临的问题是我为 选择的 key groupByKey 使我的状态太大太快。显而易见的出路是更改 key ,但更改我希望在 中应用的业务逻辑。更新 方法,要求 key 与我现在拥有的完全相同,或者如果可能,访问 GroupState 对于所有键。
例如,我有来自各种组织的数据流,通常一个组织包含 userId、personId 等。请参阅下面的代码:

val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)

val df = statisticStream.toDF()

val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
案例类:
case class User(
orgId: Long,
profileId: Long,
userId: Long)

case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = {
var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId, 0L, 0L, Seq.empty)
for (event <- newEvents) {
//business logic like checking if userId in this organization is of certain type and then accordingly update the known or unknown attribute for that particular user.
oldState.update(state)
state
}
当我必须在 Driver-Executor 模型上执行此操作时,问题会变得更糟,因为我预计每个组织中有 1-1000 万用户,这可能意味着单个执行程序上有这么多状态(如果我理解有误,请纠正我。)
失败的可能解决方案:
  • 按用户 ID 键分组 - 因为这样我就无法获得给定 orgId 的所有用户 ID,因为这些 GroupState 被放在聚合键、值对中,在这里,它是用户 ID。所以对于每一个新的 UserId,都会创建一个新的状态,即使它属于同一个组织。

  • 任何帮助或建议表示赞赏。

    最佳答案

    您的状态不断增加,因为在当前实现中,不会从 GroupState 中删除任何键/状态对。
    为了准确缓解您面临的问题(无限增加状态)mapGroupsWithState方法允许您使用 超时 .您可以在两种类型的超时之间进行选择:

  • 使用 GroupStateTimeout.ProcessingTimeTimeout 的处理时间超时与 GroupState.setTimeoutDuration() , 或
  • 使用 GroupStateTimeout.EventTimeTimeout 的事件时间超时与 GroupState.setTimeoutTimestamp() .

  • 请注意,它们之间的区别是基于持续时间的超时和更灵活的基于时间的超时。
    在特征的 ScalaDocs 中 GroupState您会找到一个关于如何在映射函数中使用超时的不错模板:
    def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

    if (state.hasTimedOut) { // If called when timing out, remove the state
    state.remove()

    } else if (state.exists) { // If state exists, use it for processing
    val existingState = state.get // Get the existing state
    val shouldRemove = ... // Decide whether to remove the state
    if (shouldRemove) {
    state.remove() // Remove the state

    } else {
    val newState = ...
    state.update(newState) // Set the new state
    state.setTimeoutDuration("1 hour") // Set the timeout
    }

    } else {
    val initialState = ...
    state.update(initialState) // Set the initial state
    state.setTimeoutDuration("1 hour") // Set the timeout
    }
    ...
    // return something
    }

    关于apache-spark - Spark 有状态结构化流 : State getting too big in mapGroupsWithState,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63820145/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com