gpt4 book ai didi

apache-spark - Spark.sql.autoBroadcastJoinThreshold 是否适用于使用数据集的联接运算符的联接?

转载 作者:行者123 更新时间:2023-12-03 07:20:55 29 4
gpt4 key购买 nike

我想知道 spark.sql.autoBroadcastJoinThreshold 属性对于在所有工作节点上广播较小的表(在进行联接时)是否有用,即使联接方案使用数据集 API加入而不是使用 Spark SQL。

如果我的较大表是 250 Gigs,较小表是 20 Gigs,我是否需要设置此配置:spark.sql.autoBroadcastJoinThreshold = 21 Gigs(可能)以便发送整个表/数据集到所有工作节点?

示例:

  • 数据集 API 加入

    val result = rawBigger.as("b").join(
    broadcast(smaller).as("s"),
    rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID),
    "left_outer"
    )
  • SQL

    select * 
    from rawBigger_table b, smaller_table s
    where b.campign_id = s.campaign_id;

最佳答案

首先,spark.sql.autoBroadcastJoinThresholdbroadcast 提示是单独的机制。即使禁用了 autoBroadcastJoinThreshold 设置广播提示也会优先。使用默认设置:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark 将使用 autoBroadcastJoinThreshold 并自动广播数据:

df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))

当我们禁用自动广播时,Spark 将使用标准 SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=Some(8))
+- *Sort [id#3L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

但可以强制使用 BroadcastHashJoinbroadcast 提示:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))

SQL 有自己的提示格式(类似于 Hive 中使用的提示格式):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)

因此,回答您的问题 - autoBroadcastJoinThreshold 在使用 Dataset API 时适用,但在使用显式 broadcast 提示时不相关。

此外,广播大型对象不太可能提供任何性能提升,并且在实践中通常会降低性能并导致稳定性问题。请记住,广播对象必须首先获取到驱动程序,然后发送到每个工作人员,最后加载到内存中。

关于apache-spark - Spark.sql.autoBroadcastJoinThreshold 是否适用于使用数据集的联接运算符的联接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43984068/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com