- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何强制 Spark 中的数据帧完全外部连接以使用 Boradcast 哈希连接?这是代码片段:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
我的 SmallTable 的大小远小于上面指定的 autoBroadcastJoinThreshold
。此外,如果我使用内部、left_outer
或 right_outer
连接,我会从 DAG 可视化中看到该连接按预期使用 BroadcastHashJoin
。
但是,当我使用“outer
”作为连接类型时,spark 出于某种未知原因决定使用 SortMergeJoin
。有谁知道如何解决这个问题?根据我看到的左外连接的性能,BroadcastHashJoin
将有助于使我的应用程序加速数倍。
最佳答案
spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem?
原因: FullOuter(表示任何关键字 outer
、full
、fullouter
)不支持广播哈希连接(又名 map 侧连接)
如何证明这一点?
让我们举一个例子:
package com.examplesimport org.apache.log4j.{Level, Logger}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._/** * Join Example and some basics demonstration using sample data. * * @author : Ram Ghadiyaram */object JoinExamples extends Logging { // switch off un necessary logs Logger.getLogger("org").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate; case class Person(name: String, age: Int, personid: Int) case class Profile(name: String, personId: Int, profileDescription: String) /** * main * * @param args Array[String] */ def main(args: Array[String]): Unit = { spark.conf.set("spark.sql.join.preferSortMergeJoin", "false") import spark.implicits._ spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString())) /** * create 2 dataframes here using case classes one is Person df1 and another one is profile df2 */ val df1 = spark.sqlContext.createDataFrame( spark.sparkContext.parallelize( Person("Sarath", 33, 2) :: Person("KangarooWest", 30, 2) :: Person("Ravikumar Ramasamy", 34, 5) :: Person("Ram Ghadiyaram", 42, 9) :: Person("Ravi chandra Kancharla", 43, 9) :: Nil)) val df2 = spark.sqlContext.createDataFrame( Profile("Spark", 2, "SparkSQLMaster") :: Profile("Spark", 5, "SparkGuru") :: Profile("Spark", 9, "DevHunter") :: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") /** * * Example displays how to join them in the dataframe level * next example demonstrates using sql with createOrReplaceTempView */ val joined_df = df_asPerson.join( broadcast(df_asProfile) , col("dfperson.personid") === col("dfprofile.personid") , "outer") val joined = joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) joined.explain(false) // it will show which join was used joined.show }}
I tried to use broadcast hint for fullouter
join but framework is ignoring and its taking SortMergeJoin
below is the explain plan for this.Result :
== Physical Plan ==*Project [name#4, age#5, name#11, profileDescription#13]+- SortMergeJoin [personid#6], [personid#12], FullOuter :- *Sort [personid#6 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(personid#6, 200) : +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6] : +- Scan ExternalRDDScan[obj#3] +- *Sort [personid#12 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(personid#12, 200) +- LocalTableScan [name#11, personId#12, profileDescription#13]+--------------------+---+-----+------------------+| name|age| name|profileDescription|+--------------------+---+-----+------------------+| Ravikumar Ramasamy| 34|Spark| SparkGuru|| Ram Ghadiyaram| 42|Spark| DevHunter||Ravi chandra Kanc...| 43|Spark| DevHunter|| Sarath| 33|Spark| SparkSQLMaster|| KangarooWest| 30|Spark| SparkSQLMaster|+--------------------+---+-----+------------------+
From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.
Other case except fullouter
join ... If you dont want spark to use sortmergejoin in any case you can set the below property.
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
这是代码 SparkStrategies.scala
(which is responsible & Converts a logical plan into zero or more SparkPlans) 的说明您不想使用 sortmergejoin
。
此属性 spark.sql.join.preferSortMergeJoin
当为 true 时,首选排序合并连接而不是随机散列连接 PREFER_SORTMERGEJOIN属性(property)。
设置false
意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。
下面的文档位于 SparkStrategies.scala
中,即在 object JoinSelection extends Strategy with PredicateHelper ... 的顶部...
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD
]] 阈值 或者如果该方有明确的广播提示(例如,用户应用了 [[org.apache.spark.sql.functions.broadcast()
]] 函数到 DataFrame
),然后那边 连接的一侧将被广播,另一侧将被流式传输,没有洗牌 执行。如果连接双方都有资格进行广播,则Shuffle hash join:如果单个分区的平均大小足够小以构建哈希表。
排序合并:如果匹配的连接键可排序。
关于scala - Spark 中的 Broadcast Hash Join (BHJ) 用于全外连接(outer、full、fulouter),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43622483/
我正在测试设置SQLAlchemy以映射现有数据库。这个数据库是很久以前自动建立的,它是由我们不再使用的先前的第三方应用程序创建的,因此 undefined 某些预期的事情,例如外键约束。该软件将管理
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (28 个答案) 关闭 7 年前。 INNE
这个问题在这里已经有了答案: What is the difference between "INNER JOIN" and "OUTER JOIN"? (29 个回答) 关闭7年前. INNER J
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
假设有两个表: table1.c1 table1.c2 1 1 A 2 1 B 3 1 C 4 2
一.先看一些最简单的例子 例子 Table A aid adate 1 a1 2&nb
数据库操作语句 7. 外连接——交叉查询 7.1 查询 7.2 等值连接 7.3 右外
我有两个表 'users' 和 'lms_users' class LmsUser belongs_to :user end class User has_one :lms_user
我试图避免在 Rails 中对我的 joins 进行字符串插值,因为我注意到将查询器链接在一起时灵活性会降低。 也就是说,我觉得 joins(:table1) 比 joins('inner join
我有这个代码 User.find(:all, :limit => 10, :joins => :user_points, :select => "users.*, co
我刚刚开始探索 Symfony2,我很惊讶它拥有如此多的强大功能。我开始做博客教程在: http://tutorial.symblog.co.uk/ 但使用的是 2.1 版而不是 2.0 我的问题是我
什么是 SQL JOIN什么是不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: http
我有两个 Hive 表,我正在尝试加入它们。这些表没有被任何字段聚集或分区。尽管表包含公共(public)键字段的记录,但连接查询始终返回 0 条记录。所有数据类型都是“字符串”数据类型。 连接查询很
我正在使用 Solr 的(4.0.0-beta)连接功能来查询包含具有父/子关系的文档的索引。连接查询效果很好,但我只能在搜索结果中获得父文档。我相信这是预期的行为。 但是,是否有可能在搜索结果中同时
我正在使用可用的指南/api/书籍自学 Rails,但我无法理解通过三种方式/嵌套 has_many :through 关联进行的连接。 我有用户与组相关联:通过成员(member)资格。 我在多对多
什么是 SQL JOIN,有哪些不同的类型? 最佳答案 插图来自 W3schools : 关于SQL JOIN 和不同类型的 JOIN,我们在Stack Overflow上找到一个类似的问题: htt
我正在尝试访问数据库的两个表。在商店里,我保留了一个事件列表,其中包含 Table Event id, name,datei,houri, dateF,Hourf ,capacity, age ,de
我有 4 个表:booking、address、search_address 和 search_address_log 表:(相关列) 预订:(pickup_address_id, dropoff_a
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我在YML中有以下结构:。我正试着创造一个这样的结构:。作业名称和脚本用~分隔,作业用;分隔。。我可以使用以下命令使其正常工作。然而,我想知道是否可以用一个yq表达式来完成,而不是通过管道再次使用yq
我是一名优秀的程序员,十分优秀!