- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何强制 Spark 中的数据帧完全外部连接以使用 Boradcast 哈希连接?这是代码片段:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
我的 SmallTable 的大小远小于上面指定的 autoBroadcastJoinThreshold
。此外,如果我使用内部、left_outer
或 right_outer
连接,我会从 DAG 可视化中看到该连接按预期使用 BroadcastHashJoin
。
但是,当我使用“outer
”作为连接类型时,spark 出于某种未知原因决定使用 SortMergeJoin
。有谁知道如何解决这个问题?根据我看到的左外连接的性能,BroadcastHashJoin
将有助于使我的应用程序加速数倍。
最佳答案
spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem?
原因: FullOuter(表示任何关键字 outer
、full
、fullouter
)不支持广播哈希连接(又名 map 侧连接)
如何证明这一点?
让我们举一个例子:
package com.examplesimport org.apache.log4j.{Level, Logger}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._/** * Join Example and some basics demonstration using sample data. * * @author : Ram Ghadiyaram */object JoinExamples extends Logging { // switch off un necessary logs Logger.getLogger("org").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate; case class Person(name: String, age: Int, personid: Int) case class Profile(name: String, personId: Int, profileDescription: String) /** * main * * @param args Array[String] */ def main(args: Array[String]): Unit = { spark.conf.set("spark.sql.join.preferSortMergeJoin", "false") import spark.implicits._ spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString())) /** * create 2 dataframes here using case classes one is Person df1 and another one is profile df2 */ val df1 = spark.sqlContext.createDataFrame( spark.sparkContext.parallelize( Person("Sarath", 33, 2) :: Person("KangarooWest", 30, 2) :: Person("Ravikumar Ramasamy", 34, 5) :: Person("Ram Ghadiyaram", 42, 9) :: Person("Ravi chandra Kancharla", 43, 9) :: Nil)) val df2 = spark.sqlContext.createDataFrame( Profile("Spark", 2, "SparkSQLMaster") :: Profile("Spark", 5, "SparkGuru") :: Profile("Spark", 9, "DevHunter") :: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") /** * * Example displays how to join them in the dataframe level * next example demonstrates using sql with createOrReplaceTempView */ val joined_df = df_asPerson.join( broadcast(df_asProfile) , col("dfperson.personid") === col("dfprofile.personid") , "outer") val joined = joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) joined.explain(false) // it will show which join was used joined.show }}
I tried to use broadcast hint for fullouter
join but framework is ignoring and its taking SortMergeJoin
below is the explain plan for this.Result :
== Physical Plan ==*Project [name#4, age#5, name#11, profileDescription#13]+- SortMergeJoin [personid#6], [personid#12], FullOuter :- *Sort [personid#6 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(personid#6, 200) : +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6] : +- Scan ExternalRDDScan[obj#3] +- *Sort [personid#12 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(personid#12, 200) +- LocalTableScan [name#11, personId#12, profileDescription#13]+--------------------+---+-----+------------------+| name|age| name|profileDescription|+--------------------+---+-----+------------------+| Ravikumar Ramasamy| 34|Spark| SparkGuru|| Ram Ghadiyaram| 42|Spark| DevHunter||Ravi chandra Kanc...| 43|Spark| DevHunter|| Sarath| 33|Spark| SparkSQLMaster|| KangarooWest| 30|Spark| SparkSQLMaster|+--------------------+---+-----+------------------+
From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.
Other case except fullouter
join ... If you dont want spark to use sortmergejoin in any case you can set the below property.
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
这是代码 SparkStrategies.scala
(which is responsible & Converts a logical plan into zero or more SparkPlans) 的说明您不想使用 sortmergejoin
。
此属性 spark.sql.join.preferSortMergeJoin
当为 true 时,首选排序合并连接而不是随机散列连接 PREFER_SORTMERGEJOIN属性(property)。
设置false
意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。
下面的文档位于 SparkStrategies.scala
中,即在 object JoinSelection extends Strategy with PredicateHelper ... 的顶部...
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD
]] 阈值 或者如果该方有明确的广播提示(例如,用户应用了 [[org.apache.spark.sql.functions.broadcast()
]] 函数到 DataFrame
),然后那边 连接的一侧将被广播,另一侧将被流式传输,没有洗牌 执行。如果连接双方都有资格进行广播,则Shuffle hash join:如果单个分区的平均大小足够小以构建哈希表。
排序合并:如果匹配的连接键可排序。
关于scala - Spark 中的 Broadcast Hash Join (BHJ) 用于全外连接(outer、full、fulouter),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43622483/
如何强制 Spark 中的数据帧完全外部连接以使用 Boradcast 哈希连接?这是代码片段: sparkConfiguration.set("spark.sql.autoBroadcastJoin
我是一名优秀的程序员,十分优秀!