gpt4 book ai didi

java - 请问 Spark 会安排加入吗?

转载 作者:搜寻专家 更新时间:2023-11-01 01:51:17 28 4
gpt4 key购买 nike

我正在加入两个 RDD rddArddB

rddA 有 100 个分区,rddB 有 500 个分区。

我正在尝试了解join 操作的机制。默认情况下,无论连接顺序如何,我最终都会得到相同的分区结构;即 rddA.join(rddB) 和 rddB.join(rddA) 产生相同数量的分区,通过观察它使用较小的分区大小,100。我知道我可以通过使用 rddA.join(rddB,500) 来增加分区大小,但我更感兴趣的是幕后发生的事情以及为什么选择较小的大小。据观察,即使我对小的rdd重新分区,它的分区仍然会被使用; Spark 是否对 key 大小进行任何启发式分析?

我遇到的另一个问题是偏斜程度。我的较小分区最终有 3,314 个条目,而较大的分区最终有 1,139,207 个条目,总大小为 599,911,729(键)。两个 RDD 都使用默认的分区器,那么 data shuffle 是如何决定的呢?我依稀记得读过,如果一个 rdd 有一个分区器集,那么将使用那个分区器。是这样吗? “推荐”这样做吗?

最后,请注意我的两个 rdd 都相对较大 (~90GB),因此广播连接无济于事。相反,任何为 join 操作提供一些见解的方法都可能是可行的方法。

附言。任何关于左右连接机制的细节都将是一个额外的好处:)

最佳答案

虽然我还没有设法解释分区是如何派生的,但我确实发现了数据是如何洗牌的(这是我最初的问题)。连接有一些副作用:

洗牌/分区:Spark 将散列分区“RDD”键并在“Workers”之间移动/分发。给定键的每组值(例如 5)将在单个“Worker”/JVM 中结束。这意味着如果您的“连接”具有 1..N 关系并且 N 严重偏斜,您最终将得到偏斜的分区和 JVM 堆(即一个“分区”可能有 Max(N) 而另一个 Min(N) ).避免这种情况的唯一方法是尽可能使用“广播”或忍受这种行为。由于您的数据最初会均匀分布,因此混洗的数量将取决于 key 哈希。

重新分区:在“倾斜”连接之后,调用“重新分区”似乎可以在分区之间均匀地重新分配数据。所以这是一件好事,如果你有不可避免的偏斜问题。请注意,尽管此转换会触发大量洗牌,但后续操作会快得多。这样做的缺点是无法控制对象创建(见下文)

对象创建/堆污染:您设法加入您的数据,认为重新分区是重新平衡集群的好主意,但由于某种原因,“重新分区”会触发“OOME”。发生的情况是最初连接的数据重新使用连接的对象。当您触发“重新分区”或任何其他涉及洗牌的“操作”时,例如一个额外的连接或“groupBy”(后跟一个“Action”),数据被序列化,所以你失去了对象的重用。一旦对象被反序列化,它们现在就是新实例。另请注意,在序列化过程中,重用会丢失,因此 suffle 会很重。因此,在我的例子中,1..1000000 连接(其中 1 是我的“重”对象)将在触发随机播放的任何操作后失败。

解决方法/调试:

  1. 我使用“mapPartitionsWithIndex”通过返回单个项目“Iterable>”和每个分区的计数来调试分区大小。这非常有用,因为您可以看到“重新分区”的效果以及“操作”后分区的状态。
  2. 您可以在连接 RDD 上使用“countByKeyApprox”或“countByKey”来了解基数,然后分两步应用连接。为高基数键使用“广播”,为低基数键使用“加入”。将这些操作包装在“rdd.cache()”和“rdd.unpersist()” block 中将显着加快此过程。虽然这可能会使您的代码稍微复杂一些,但它会提供更好的性能,尤其是在您执行后续操作时。另请注意,如果您在每个“ map ”中使用“广播”进行查找,您还将显着减少改组大小。
  3. 调用影响分区数量的其他操作的“重新分区”可能非常有用,但请注意(随机)大量分区会导致更多的偏斜,因为给定键的大集合将创建大分区,但其他分区的大小将很小或为 0。创建调试方法来获取分区的大小将帮助您选择合适的大小。

关于java - 请问 Spark 会安排加入吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30412325/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com