gpt4 book ai didi

apache-spark - Spark Structured Streaming foreachBatch 和 UPSERT(合并): to persist or not to persist?

转载 作者:行者123 更新时间:2023-12-04 04:19:21 25 4
gpt4 key购买 nike

如果在使用 foreachBatch 的结构化流中进行状态聚合(任意)以将更新合并到增量表中,我是否应该在更新之前将批处理数据帧保留在 foreachBatch 中?

似乎不需要持久化,因为我正在写入单个数据接收器。

另一方面,我有强烈的感觉,不坚持会导致源重新扫描并触发聚合两次。

有什么意见/想法吗?

foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) -> 
deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
.whenNotMatched().insertAll() // new session to be added
.whenMatched()
.updateAll()
.execute())

最佳答案

因此 delta 用户 (https://groups.google.com/g/delta-users/c/Ihm6PMilCdI) 的答案是:

DeltaTable.merge (upsert) does two passes on the source data.

因此,如果您确实关心 mapGroupsWithState 内任意状态聚合中的 Spark 指标或日志/flatmapGroupsWithState - 在内部合并之前进行持久化/缓存 foreachBatch , 否则发送的指标将具有双倍 (x2) 值并且日志聚合日志将被发出两次

关于apache-spark - Spark Structured Streaming foreachBatch 和 UPSERT(合并): to persist or not to persist?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59805398/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com