gpt4 book ai didi

java - (Spark skewed join) 如何在没有内存问题的情况下连接两个具有高度重复键的大型 Spark RDD?

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:32:44 32 4
gpt4 key购买 nike

this previous question ,我试图通过避免使用 join 来使用 Spark join避免内存问题

在这个新问题中,我正在使用 join,但试图用它修复内存问题

这是我的两个 RDD:

  1. productToCustomerRDD:
    大小:非常大,可能有数百万个不同的键
    使用 HashPartitioner
    根据键进行分区有些键将高度重复,有些则不会。

    (toast, John)
    (butter, John)
    (toast, Jane)
    (jelly, Jane)
  2. productToCountRDD:
    大小:非常大,可能有数百万个不同的 key ,太大而无法广播
    使用 HashPartitioner
    根据键进行分区键是唯一,值是购买该产品的客户数量。

    (toast, 2)
    (butter, 1)
    (jelly, 1)

我想加入这两个RDD,结果会是:

  1. customerToProductAndCountRDD:

    (toast, (John, 2))
    (butter, (John, 1))
    (toast, (Jane, 2))
    (jelly, (Jane, 1))

如果我使用 productToCustomerRDD.join(productToCountRDD) 加入两个 RDD,我会在两个分区(数千个)上得到一个 OutOfMemoryError。在 Spark UI 中,我注意到在包含 join 的阶段,在 Input Size/Records 列中,所有分区都有来自 4K700K。除了产生 OOM 的两个分区之外的所有分区:一个有 9M 记录,一个有 6M 记录。

据我了解,为了加入,需要将具有相同 key 的对进行洗牌并移动到同一分区(除非它们之前已按 key 进行分区)。然而,由于某些键非常频繁(例如:数据集中几乎每个客户都购买的产品),大量数据可能会移动到一个分区,无论是在 join或者在加入之前的重新分区期间。

我的理解正确吗?
有没有办法避免这种情况?
有没有一种方法可以join,而无需在同一分区上拥有一个高度重复的键的所有数据?

最佳答案

实际上,这是 Spark 中的一个标准问题,称为“倾斜连接”:连接的一侧是倾斜的,这意味着它的一些键比其他键更频繁。可以找到一些不适合我的答案 here .

我使用的策略是受 GraphFrame.skewedJoin() 方法的启发 here及其在 ConnectedComponents.skewedJoin() 中的使用 here .连接将通过使用广播连接连接最频繁的键和使用标准连接连接不太频繁的键来执行。

在我的示例 (OP) 中,productToCountRDD 已包含有关键频率的信息。所以它是这样的:

  • 过滤 productToCountRDD 以仅保留高于固定阈值的计数,并将 collectAsMap() 发送给驱动程序。
  • 将此 map 广播给所有执行者。
  • productToCustomerRDD 分成两个 RDD:在广播映射中找到的键(频繁键)和不在广播映射中的键(不常见键)。
  • 使用mapToPair 执行频繁键的连接,从广播映射中获取count
  • 使用 join 执行不常见键的连接。
  • 在最后使用 union 来获得完整的 RDD。

关于java - (Spark skewed join) 如何在没有内存问题的情况下连接两个具有高度重复键的大型 Spark RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50588608/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com