gpt4 book ai didi

scala - Spark 中的 Broadcast Hash Join (BHJ) 用于全外连接(outer、full、fulouter)

转载 作者:行者123 更新时间:2023-12-02 01:18:27 25 4
gpt4 key购买 nike

如何强制 Spark 中的数据帧完全外部连接以使用 Boradcast 哈希连接?这是代码片段:

sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)

我的 SmallTable 的大小远小于上面指定的 autoBroadcastJoinThreshold。此外,如果我使用内部、left_outerright_outer 连接,我会从 DAG 可视化中看到该连接按预期使用 BroadcastHashJoin

但是,当我使用“outer”作为连接类型时,spark 出于某种未知原因决定使用 SortMergeJoin。有谁知道如何解决这个问题?根据我看到的左外连接的性能,BroadcastHashJoin 将有助于使我的应用程序加速数倍。

最佳答案

spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem?

原因: FullOuter(表示任何关键字 outerfullfullouter )不支持广播哈希连接(又名 map 侧连接)

如何证明这一点?

让我们举一个例子:

package com.examplesimport org.apache.log4j.{Level, Logger}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._/**  * Join Example and some basics demonstration using sample data.  *  * @author : Ram Ghadiyaram  */object JoinExamples extends Logging {  // switch off  un necessary logs  Logger.getLogger("org").setLevel(Level.OFF)   val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate;  case class Person(name: String, age: Int, personid: Int)  case class Profile(name: String, personId: Int, profileDescription: String)  /**    * main    *    * @param args Array[String]    */  def main(args: Array[String]): Unit = {    spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")    import spark.implicits._    spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString()))    /**      * create 2 dataframes here using case classes one is Person df1 and another one is profile df2      */    val df1 = spark.sqlContext.createDataFrame(      spark.sparkContext.parallelize(        Person("Sarath", 33, 2)          :: Person("KangarooWest", 30, 2)          :: Person("Ravikumar Ramasamy", 34, 5)          :: Person("Ram Ghadiyaram", 42, 9)          :: Person("Ravi chandra Kancharla", 43, 9)          :: Nil))    val df2 = spark.sqlContext.createDataFrame(      Profile("Spark", 2, "SparkSQLMaster")        :: Profile("Spark", 5, "SparkGuru")        :: Profile("Spark", 9, "DevHunter")        :: Nil    )    // you can do alias to refer column name with aliases to  increase readablity    val df_asPerson = df1.as("dfperson")    val df_asProfile = df2.as("dfprofile")    /** *      * Example displays how to join them in the dataframe level      * next example demonstrates using sql with createOrReplaceTempView      */    val joined_df = df_asPerson.join(      broadcast(df_asProfile)      , col("dfperson.personid") === col("dfprofile.personid")      , "outer")    val joined = joined_df.select(      col("dfperson.name")      , col("dfperson.age")      , col("dfprofile.name")      , col("dfprofile.profileDescription"))    joined.explain(false) // it will show which join was used    joined.show  }}

I tried to use broadcast hint for fullouter join but framework is ignoring and its taking SortMergeJoin below is the explain plan for this.Result :

== Physical Plan ==*Project [name#4, age#5, name#11, profileDescription#13]+- SortMergeJoin [personid#6], [personid#12], FullOuter   :- *Sort [personid#6 ASC NULLS FIRST], false, 0   :  +- Exchange hashpartitioning(personid#6, 200)   :     +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6]   :        +- Scan ExternalRDDScan[obj#3]   +- *Sort [personid#12 ASC NULLS FIRST], false, 0      +- Exchange hashpartitioning(personid#12, 200)         +- LocalTableScan [name#11, personId#12, profileDescription#13]+--------------------+---+-----+------------------+|                name|age| name|profileDescription|+--------------------+---+-----+------------------+|  Ravikumar Ramasamy| 34|Spark|         SparkGuru||      Ram Ghadiyaram| 42|Spark|         DevHunter||Ravi chandra Kanc...| 43|Spark|         DevHunter||              Sarath| 33|Spark|    SparkSQLMaster||        KangarooWest| 30|Spark|    SparkSQLMaster|+--------------------+---+-----+------------------+

From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.

Other case except fullouter join ... If you dont want spark to use sortmergejoin in any case you can set the below property.

sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

这是代码 SparkStrategies.scala (which is responsible & Converts a logical plan into zero or more SparkPlans) 的说明您不想使用 sortmergejoin

小鬼。注意:

此属性 spark.sql.join.preferSortMergeJoin 当为 true 时,首选排序合并连接而不是随机散列连接 PREFER_SORTMERGEJOIN属性(property)。

设置false意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。

下面的文档位于 SparkStrategies.scala 中,即在 object JoinSelection extends Strategy with PredicateHelper ... 的顶部...

<小时/>
  • 广播:如果连接一侧的估计物理尺寸小于 用户可配置的 [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] 阈值 或者如果该方有明确的广播提示(例如,用户应用了 [[org.apache.spark.sql.functions.broadcast()]] 函数到 DataFrame),然后那边 连接的一侧将被广播,另一侧将被流式传输,没有洗牌 执行。如果连接双方都有资格进行广播,则
  • Shuffle hash join:如果单个分区的平均大小足够小以构建哈希表。

  • 排序合并:如果匹配的连接键可排序。

关于scala - Spark 中的 Broadcast Hash Join (BHJ) 用于全外连接(outer、full、fulouter),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43622483/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com