gpt4 book ai didi

apache-spark - 无法压缩分区数不等的 RDD

转载 作者:行者123 更新时间:2023-12-04 12:07:28 25 4
gpt4 key购买 nike

现在我有 3 个这样的 RDD:

rdd1:

1 2
3 4
5 6
7 8
9 10

rdd2:
11 12
13 14

rdd3:
15 16
17 18
19 20

我想这样做:
rdd1.zip(rdd2.union(rdd3))

我希望结果是这样的:
1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20

但我有这样的异常(exception):

Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions



有人告诉我我可以毫无异常(exception)地做到这一点:
rdd1.zip(rdd2.union(rdd3).repartition(1))

不过好像有点小成本。所以我想知道是否有其他方法可以解决这个问题。

最佳答案

我不确定您所说的“成本”是什么意思,但您怀疑 repartition(1) 是对的。不是正确的解决方案。它会将 RDD 重新分区为单个分区。

  • 如果您的数据不能放在一台机器上,这将失败。
  • 它仅适用于 rdd1有一个分区。当您有更多数据时,这可能不再成立。
  • repartition执行 shuffle,因此您的数据最终可能会以不同的方式排序。

  • 我认为正确的解决方案是放弃使用 zip ,因为您可能无法确保分区匹配。创建 key 并使用 join反而:
    val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
    val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
    val offset = rdd2.count
    val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
    val combined =
    indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
    case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
    }

    无论分区如何,这都将起作用。如果您愿意,可以对结果进行排序并删除最后的索引:
    val unindexed = combined.sortByKey().values

    关于apache-spark - 无法压缩分区数不等的 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29814499/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com