gpt4 book ai didi

python - 通过键与 RDD 相交

转载 作者:太空宇宙 更新时间:2023-11-04 05:40:25 30 4
gpt4 key购买 nike

我有两个 RDD,一个非常大,另一个小得多。我想用小 RDD 的键在大 RDD 中找到所有唯一的元组。

  • 大 RDD 太大,我必须避免完全洗牌
  • 小型 RDD 也足够大,我无法广播它。我也许可以广播它的 key 。
  • 还有重复的元组,我只关心不同的元组。

例如

large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
('b', [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]

我的解决方案中有两个开销很大的操作 - join 和 distinct。我认为两者都会导致完全洗牌并使子 RDD 哈希分区。鉴于此,以下是我能做的最好的吗?

keys = sc.broadcast(small_rdd.keys().distinct().collect())

filtered_unique_large_rdd = (large_rdd
.filter(lambda (k, v): k in keys.value)
.distinct()
.groupByKey())

(filtered_unique_large_rdd
.join(small_rdd.groupByKey())
.mapValues(lambda x: sum([list(i) for i in x], []))
.collect())

基本上,我显式过滤元组,选择不同的元组,然后与 smaller_rdd 连接。我希望该不同的操作会将键散列分区,并且不会在后续连接期间引起另一次洗牌。

提前感谢您的任何建议/想法。

PS:这不是重复的 Which function in spark is used to combine two RDDs by keys因为加入(完全随机播放)是一个选项。

最佳答案

There are two expensive operations in my solution - join and distinct.

其实有三个昂贵的操作。您应该将 groupByKey 添加到列表中。

I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.

distinct 不会,但后续的 groupByKey 会。问题是它需要将您的数据洗牌两次 - 一次用于 distinct,一次用于 groupByKey

filtered_unique_large_rdd.toDebugString()

## (8) PythonRDD[27] at RDD at PythonRDD.scala:43 []
## | MapPartitionsRDD[26] at mapPartitions at PythonRDD.scala:374 []
## | ShuffledRDD[25] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[24] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
## | PythonRDD[23] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
## | MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:374 []
## | ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[20] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
## | PythonRDD[19] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
## | ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []

您可以尝试将 distinct 后跟 groupByKey 替换为 aggregateByKey:

zeroValue = set()

def seqFunc(acc, x):
acc.add(x)
return acc

def combFunc(acc1, acc2):
acc1.update(acc2)
return acc1

grouped_by_aggregate = (large_rdd
.filter(lambda kv: k[0] in keys.value)
.aggregateByKey(zeroValue, seqFunc, combFunc))

与您当前的解决方案相比,它只需将 large_rdd 洗牌一次:

grouped_by_aggregate.toDebugString()

## (8) PythonRDD[54] at RDD at PythonRDD.scala:43 []
## | MapPartitionsRDD[53] at mapPartitions at PythonRDD.scala:374
## | ShuffledRDD[52] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[51] at aggregateByKey at <ipython-input-60-67c93b2860a0 ...
## | PythonRDD[50] at aggregateByKey at <ipython-input-60-67c93b2860a0> ...
## | ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []

另一个可能的改进是在广播之前将 key 转换为设置:

keys = sc.broadcast(set(small_rdd.keys().distinct().collect()))

现在您的代码针对过滤器的每个步骤对列表执行线性搜索。

关于python - 通过键与 RDD 相交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34139273/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com