gpt4 book ai didi

scala - distinct() 是否对数据集进行排序?

转载 作者:行者123 更新时间:2023-12-01 09:43:07 26 4
gpt4 key购买 nike

我正在编写一个预处理应用程序,除其他转换和操作外,它会在将数据集写入 HDFS 之前对数据集进行排序。一个新的请求要求我对数据集进行重复数据删除,所以我想在一个阶段进行排序。我的理解是,为了有效地进行重复数据删除,排序是必要的(也许我在这方面错了,没有研究太多,只是看起来很自然)。

由于某些原因(输出模式中的 MapType 列),我首先测试了 distinct早于 sort ,以为我会摆脱MapType稍后列以便将它们合并在一起。

Spark UI output

发生的事情是跳过了排序的第二阶段,就好像数据集已经排序了一样。这对我来说很有意义,但在文档 (AFAIK) 中的任何地方都不支持,我不知道,它是否是稳定的预期行为(我不想将其推向生产只是为了意识到我突然做 2 个昂贵的阶段:sortdistinct 两者)。任何人都对 sort 有更多的见解和/或 distinct实现的?

最佳答案

在 Spark 中,distinct以及一般所有的聚合操作(例如 groupBy )不要对数据进行排序。我们可以使用 explain 轻松检查。功能。

// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")

data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
+- *HashAggregate(keys=[r#105L], functions=[])
+- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
+- *Range (0, 10, step=1, splits=2)
HashAggregate + Exchange意味着元素被散列和混洗,以便具有相同散列的元素在同一分区中。然后,对具有相同散列的元素进行比较和去重。因此,数据不会在处理后排序。让我们检查一下:

data.distinct.show()
+---+
| r|
+---+
| 0|
| 3|
| 2|
+---+

现在让我们解决您对性能的担忧。如果您在重复数据删除后进行排序,则会发生以下情况。

data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- Exchange hashpartitioning(r#227L, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)

我们可以看到数据被打乱以进行重复数据删除( Exchange hashpartitioning )并再次打乱以进行排序( Exchange rangepartitioning )。那相当昂贵。这是因为排序需要按范围进行洗牌,以便同一范围内的元素最终位于同一分区中,然后可以对其进行排序。然而,我们可以在重复数据删除之前更聪明地进行排序:

data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)

只剩下一个交换。事实上,spark 知道在按范围 shuffle 之后,重复的元素在同一个分区中。因此它不会触发新的洗牌。

关于scala - distinct() 是否对数据集进行排序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441314/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com