gpt4 book ai didi

apache-spark - 在 Spark 中加入并行性好吗?

转载 作者:行者123 更新时间:2023-12-03 20:30:30 24 4
gpt4 key购买 nike

我在一个非常小的数据集 上运行一个相当小的 Spark 程序,其中包含一些 map 和 reduceByKey 操作。小于 400MB .

在某些时候,我有一个想要排序的元组 RDD,我调用 sortByKey .这是我程序中最慢的部分。其他一切似乎几乎立即运行,但这需要 20 秒 .

问题是,在我的笔记本电脑以及 AWS m3.large 机器集群中需要 20 秒。我试过1、2、3个slave,执行时间差异很小。 Ganglia 和 spark web 控制台表明所有从站的 CPU 和内存都被使用到最大容量,所以我认为配置没问题。

我还发现执行的问题比我预期的要早,但后来 I read this thread ,这指向 Spark 中的一个 Unresolved 问题。我不认为这完全相关。

是吗 sortByKey 本质上很慢,我添加多少节点并不重要,它会决定我的程序的最短执行时间?希望不是,而且我做错了一些事情并且可以修复。

编辑

原来我所看到的与我发布的那个链接有关。 sortByKey 恰好是第一个 Action (记录为转换),看起来程序排序很慢,但实际上排序非常快。问题出在以前的 加入 手术。

我所说的一切仍然适用于通过连接更改排序。为什么当我添加更多节点(或 numTask 到 join 函数)时执行时间没有减少,为什么它甚至不比普通的 SQL join 更好?我找到了 someone else having this problem之前,但除了建议调整序列化之外没有其他答案,我真的不认为这是我的情况。

最佳答案

联接本质上是一项繁重的操作,因为必须将具有相同键的值移动到同一台机器上(网络洗牌)。添加更多节点只会增加额外的 IO 开销。

我能想到两件事:

选项 1

如果您将大型数据集与较小的数据集连接起来,则广播较小的数据集是值得的:

val large = sc.textFile("large.txt").map(...) 
val smaller = sc.textFile("smaller.txt").collect().toMap()
val bc = sc.broadcast(smaller)

然后做一个“手动加入”:
large.map(x => (x.value, bc.value(x.value)))

这在 this Advanced Spark presentation 中有更详细的描述。 .

选项 2

您可以使用与大数据集相同的分区器对小数据集进行重新分区(即确保相似的键位于同一台机器上)。因此,调整小集合的分区以匹配大集合的分区。

这只会触发小集合的洗牌。一旦分区正确,加入应该相当快,因为​​它将在每个集群节点上本地运行。

关于apache-spark - 在 Spark 中加入并行性好吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24091283/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com