gpt4 book ai didi

python - 包含字典的 pyspark 数据框列的总和

转载 作者:行者123 更新时间:2023-11-30 22:12:12 25 4
gpt4 key购买 nike

我有一个仅包含一列的数据框,该列具有 MapType(StringType(), IntegerType()) 类型的元素。我想获得该列的累积和,其中 sum 操作意味着添加两个字典。

最小示例

a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)

+---------------------------+
|Maps |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+

如果我要获取Maps列的累积和,我应该得到以下结果。

+-----------------------------------+
|Maps |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+

P。 S.我使用的是Python 2.6,所以collections.Counter不可用。如果绝对必要的话我可能可以安装它。

我的尝试:

我尝试了基于累加器的方法和使用折叠的方法。

累加器

def addDictFun(x):
global v
v += x

class DictAccumulatorParam(AccumulatorParam):
def zero(self, d):
return d
def addInPlace(self, d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1

v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)

最后,我应该将生成的字典保存在 v 中。相反,我收到错误 MapType is not iterable(主要位于函数 addInPlace 中的 for k in d1 行)。

rdd.fold

基于rdd.fold的方法如下:

def add_dicts(d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1

cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)

但是,我在这里得到了相同的 MapType is not iterable 错误。知道我哪里出错了吗?

最佳答案

pyspark.sql.types 是模式描述符,而不是集合或外部语言表示,因此不能与 foldAccumulator 一起使用。

最直接的解决方案是分解并聚合

from pyspark.sql.functions import explode

df = spark.createDataFrame(
[{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}],
"map<string,integer>"
).toDF("Maps")

df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}

使用RDD你可以做类似的事情:

from operator import add

df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}

或者如果你真的想要折叠

from operator import attrgetter
from collections import defaultdict

def merge(acc, d):
for k in d:
acc[k] += d[k]
return acc

df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})

关于python - 包含字典的 pyspark 数据框列的总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51181384/

25 4 0
文章推荐: c# - 应用程序池存储在哪里
文章推荐: c# -