gpt4 book ai didi

apache-spark - 了解 SparkSQL 及其对分区的使用

转载 作者:行者123 更新时间:2023-12-01 12:26:05 24 4
gpt4 key购买 nike

我正在尝试针对某些数据操作查询评估 Spark SQL。我对此感兴趣的场景:

table1: key, value1, value2
table2: key, value3, value4

create table table3 as
select * from table1 join table2 on table1.key = table2.key

听起来我应该能够创建 table1 和 table2 RDD(但我在文档中没有看到一个非常明显的例子)。
但更大的问题是——如果我成功地按键对 2 个表 RDD 进行了分区,然后使用 Spark SQL 将它们连接起来,它是否足够聪明以利用分区?如果我创建了一个新的 RDD 作为连接的结果,它也会被分区吗?换句话说,它会完全免洗牌吗?
我非常感谢有关这些主题的文档和/或示例的指针。

最佳答案

如果您的意思是 RDDsDatasets 之间的转换,那么这两个问题的答案是否定的。

RDD 分区仅针对 RDD[(T, U)] 定义,将在 RDD 转换为 Dataset 后丢失。在某些情况下,您可以从每个现有的数据布局中受益,但 join 不是其中之一,尤其是 RDDsDatasets 使用不同的散列技术(分别是标准的 hashCodeMurmurHash。您当然可以通过定义自定义分区程序 RDD 来模拟后者这不是重点)。

同样,当 Dataset 转换为 RDD 时,有关分区的信息也会丢失。

您可以使用 Dataset 分区,但可用于优化 joins。例如,如果表已预先分区:

val n: Int = ??? 

val df1 = Seq(
("key1", "val1", "val2"), ("key2", "val3", "val4")
).toDF("key", "val1", "val2").repartition(n, $"key").cache

val df2 = Seq(
("key1", "val5", "val6"), ("key2", "val7", "val8")
).toDF("key", "val3", "val4").repartition(n, $"key").cache

后续基于 joinkey 不需要额外的交换。

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1

df1.explain
// == Physical Plan ==
// InMemoryTableScan [key#171, val1#172, val2#173]
// +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#171, 3)
// +- LocalTableScan [key#171, val1#172, val2#173]

df2.explain
// == Physical Plan ==
// InMemoryTableScan [key#201, val3#202, val4#203]
// +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#201, 3)
// +- LocalTableScan [key#201, val3#202, val4#203]
//

df1.join(df3, Seq("key")).explain
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#232, val6#233]
// +- *SortMergeJoin [key#171], [key#231], Inner
// :- *Sort [key#171 ASC], false, 0
// : +- *Filter isnotnull(key#171)
// : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : +- Exchange hashpartitioning(key#171, 3)
// : +- LocalTableScan [key#171, val1#172, val2#173]
// +- *Sort [key#231 ASC], false, 0
// +- *Filter isnotnull(key#231)
// +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)]
// +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#231, 3)
// +- LocalTableScan [key#231, val5#232, val6#233]

显然,我们并没有真正从单个连接中受益。所以只有当一个表用于多个 joins 时才有意义。

Spark 也可以从 join 创建的分区中受益,所以如果我们想执行另一个 join :

val df3 = Seq(
("key1", "val9", "val10"), ("key2", "val11", "val12")
).toDF("key", "val5", "val6")

df1.join(df3, Seq("key")).join(df3, Seq("key"))

我们将从第一个操作创建的结构中受益(注意 ReusedExchange ):

// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713]
// +- *SortMergeJoin [key#171], [key#711], Inner
// :- *Project [key#171, val1#172, val2#173, val5#682, val6#683]
// : +- *SortMergeJoin [key#171], [key#681], Inner
// : :- *Sort [key#171 ASC], false, 0
// : : +- Exchange hashpartitioning(key#171, 200)
// : : +- *Filter isnotnull(key#171)
// : : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : : +- Exchange hashpartitioning(key#171, 3)
// : : +- LocalTableScan [key#171, val1#172, val2#173]
// : +- *Sort [key#681 ASC], false, 0
// : +- Exchange hashpartitioning(key#681, 200)
// : +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683]
// : +- *Filter isnotnull(_1#677)
// : +- LocalTableScan [_1#677, _2#678, _3#679]
// +- *Sort [key#711 ASC], false, 0
// +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200)

关于apache-spark - 了解 SparkSQL 及其对分区的使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39538717/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com