gpt4 book ai didi

python - Pyspark reduceByKey 嵌套元组

转载 作者:行者123 更新时间:2023-12-01 03:45:09 27 4
gpt4 key购买 nike

我的问题类似于PySpark reduceByKey on multiple values但有一个关键的区别。我是 PySpark 的新手,所以我肯定错过了一些明显的东西。

我有一个具有以下结构的 RDD:

(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))

我想要的输出是这样的

(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)

这似乎是使用reduceByKey的完美案例,起初我想到了类似的东西

rdd.reduceByKey(lambda x,y: x[1]+y[1])

这给了我一开始使用的 RDD。我认为我的索引有问题,因为存在嵌套元组,但我已经尝试了我能想到的所有可能的索引组合,并且它不断地给我返回初始 RDD。

是否有可能有原因导致它不适用于嵌套元组,或者我做错了什么?

最佳答案

您根本不应该在此处使用reduceByKey。它需要带有签名的关联和交换函数。 (T, T) => T。很明显,当您将 List[Tuple[U, T]] 作为输入并且希望 T 作为输出时,它不适用。

由于尚不清楚键是否唯一,让我们考虑一下当我们必须在本地和全局进行聚合时的一般示例。假设 v01v02、...vm 是简单的数字:

from functools import reduce
from operator import add

def agg_(xs):
# For numeric values sum would be more idiomatic
# but lets make it more generic
return reduce(add, (x[1] for x in xs), zero_value)

zero_value = 0
merge_op = add
def seq_op(acc, xs):
return acc + agg_(xs)

rdd = sc.parallelize([
("K0", (("k01", 3), ("k02", 2))),
("K0", (("k03", 5), ("k04", 6))),
("K1", (("k11", 0), ("k12", -1)))])

rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]

如果键已经是唯一的,简单的 mapValues 就足够了:

from itertools import chain

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]

关于python - Pyspark reduceByKey 嵌套元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39054489/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com