gpt4 book ai didi

scala - 仅使用 Spark-SQL API 时广播变量的使用

转载 作者:行者123 更新时间:2023-12-04 15:13:57 30 4
gpt4 key购买 nike

在使用 Spark-RDD API 时,我们可以使用广播变量来优化 spark 分配不可变状态的方式。

1) 广播变量在内部是如何工作的?

我的假设是:对于用于对数据集执行操作的每个闭包,它引用的所有变量都必须被序列化,通过网络传输并与任务一起恢复,以便闭包可以执行。

像这样注册广播变量时​​:

val broadcastVar = sc.broadcast("hello world")

返回的对象 (Broadcast[String]) 不保留对实际对象(“hello world”)的引用,而只保留一些 ID。当一个广播变量句柄像上面所说的那样从一个闭包中被引用时,它将像所有其他变量一样被序列化——只是广播变量句柄本身不包含实际对象。

当稍后在目标节点上执行闭包时,实际对象(“hello world”)已经传输到每个节点。当闭包到达调用 broadcastVar.value 的点时,广播变量句柄在内部使用 ID 检索实际对象。

这个假设是否正确?

2)Spark-SQL中有没有办法利用这种机制?

假设我有一组允许的值。

当使用 RDD-API 时,我会为我的 allowedValues 创建一个广播变量:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))

当然,在使用 Spark-SQL-API 时,我会为此使用 Column.isin/Column.isInCollection 方法:

dataframe.where(col("mycol").isInCollection(allowedValues))

但似乎我无法通过这种方式获得广播变量的优势。

此外,如果我将这段代码更改为以下内容:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

dataframe.where(col("mycol").isInCollection(allowedValues.value))

这部分:

col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value

已经在驱动程序上进行了评估,从而产生了一个新的 Column-Object。所以广播变量在这里失去了它的优势。与第一个示例相比,它甚至会有一些开销......

有没有一种方法可以使用 Spark-SQL-API 来利用广播变量,或者我是否必须在这些时候显式使用 RDD-API?

最佳答案

How do broadcast-variables work internally?

广播的数据被序列化并物理移动到所有执行者。根据 Broadcast Variables 上的文档, 它说

“广播变量允许程序员将只读变量缓存在每台机器上,而不是随任务发送它的副本。”

Is there a way to take advantage of this mechanism in Spark-SQL?

是的,有一种方法可以利用。 Spark 在连接大小 Dataframe 时默认应用Broadcast Hash Join

根据《Learning Spark - 2nd edition》这本书,它说:

“默认情况下,如果较小的数据集小于 10MB,Spark 将使用广播连接。此配置在 spark.sql.autoBroadcastJoinThreshold 中设置;您可以根据如何减少或增加大小您在每个执行程序和驱动程序中都有很多内存。”

在您的情况下,您需要将所有唯一的 allowedValues 列到一个只有一列(名为 allowValues) 并应用联接来过滤您的 dataframe

像这样:

import org.apache.spark.sql.functions.broadcast
val result = dataframe.join(broadcast(allowedValuesDF), "mycol === allowedValues")

实际上,您可以省略 broadcast,因为 Spark 默认会进行广播连接。

编辑:

在更高版本的 Spark 中,您还可以在 SQL 语法中使用join hints 来告诉执行引擎使用哪些策略。 SQL Documentation 中提供了详细信息下面提供了一个示例:

-- Join Hints for broadcast join 
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

关于scala - 仅使用 Spark-SQL API 时广播变量的使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64683189/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com