gpt4 book ai didi

apache-spark - Spark复用广播DF

转载 作者:行者123 更新时间:2023-12-04 15:43:15 24 4
gpt4 key购买 nike

我想重用我的 DataFrame(而不回退到使用 RDD/Dataset 中的“Map”函数来执行此操作),我将其标记为可广播,但似乎 Spark 一直在广播它。

有一个表“bank”(测试表)。我执行以下操作:

  val cachedDf = spark.sql("select * from bank").cache
cachedDf.count

val dfBroadcasted = broadcast(cachedDf)

val dfNormal = spark.sql("select * from bank")

dfNormal.join(dfBroadcasted, List("age"))
.join(dfBroadcasted, List("age")).count

我之前进行了缓存,以防万一它有所作为,但不管有没有都一样。

如果我执行上面的代码,我会看到以下 SQL 计划:

SQL plan for the code

如您所见,我广播的 DF 以不同的时间广播了两次(如果我之后添加更多 Action ,它们也会再次广播)。

我很关心这个,因为我实际上有一个长期运行的程序,它有一个“大”DataFrame,我可以用它来过滤掉巨大的 DataFrame,我希望这个“大”DataFrame 能够被重用。

有没有办法强制可重用性? (不仅在同一个 Action 中,而且在 Action 之间,我可以通过相同的 Action 生存下来)

谢谢!

最佳答案

好的,更新问题。

总结:在同一个 Action 中,left_semis 将重用广播而正常/左连接不会。不确定与 Spark/开发人员已经知道该 DF 的列根本不会影响输出的事实有关,因此他们可以重用它,或者它只是缺少优化 Spark 。

我的问题似乎已基本解决,但如果有人知道如何保持跨操作的广播会更好。

如果我使用 left_semi(这是我将在我的真实应用程序中使用的连接),广播只会执行一次。

与:

    dfNormalxx.join(dfBroadcasted, Seq("age"),"left_semi")
.join(dfBroadcasted, Seq("age"),"left_semi").count

计划变成了(我也改变了尺寸以匹配我的真实尺寸,但这没有区别):

enter image description here

此外,wall 的总时间比使用“left_semi”时要好得多(我设置了 1 个执行程序,因此它不会并行化,只是想检查作业是否真的完成了两次):

enter image description here

即使我的收集需要 10 秒,这将加速表读取 + groupBys,这需要 6-7 分钟

关于apache-spark - Spark复用广播DF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56924727/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com