gpt4 book ai didi

python - 在 pyspark 中累积数据帧的最有效方法是什么?

转载 作者:太空宇宙 更新时间:2023-11-03 11:25:15 24 4
gpt4 key购买 nike

我有一个数据框(或者可以是任何 RDD),它在一个众所周知的模式中包含数百万行,如下所示:

Key | FeatureA | FeatureB
--------------------------
U1 | 0 | 1
U2 | 1 | 1

我需要从磁盘加载十几个其他数据集,这些数据集包含相同数量的键的不同特征。一些数据集最多有十几个列宽。想象一下:

Key | FeatureC | FeatureD |  FeatureE
-------------------------------------
U1 | 0 | 0 | 1

Key | FeatureF
--------------
U2 | 1

感觉就像折叠或积累,我只想迭代所有数据集并返回如下内容:

Key | FeatureA | FeatureB | FeatureC | FeatureD | FeatureE | FeatureF 
---------------------------------------------------------------------
U1 | 0 | 1 | 0 | 0 | 1 | 0
U2 | 1 | 1 | 0 | 0 | 0 | 1

我已经尝试加载每个数据帧然后加入,但是一旦我通过了一些数据集就需要很长时间。我是否缺少完成此任务的通用模式或有效方法?

最佳答案

假设每个 DataFrame 中的每个键最多有一行,并且所有键都是原始类型,您可以尝试使用聚合进行联合。让我们从一些导入和示例数据开始:

from itertools import chain
from functools import reduce
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, lit, max
from pyspark.sql import DataFrame

df1 = sc.parallelize([
("U1", 0, 1), ("U2", 1, 1)
]).toDF(["Key", "FeatureA", "FeatureB"])

df2 = sc.parallelize([
("U1", 0, 0, 1)
]).toDF(["Key", "FeatureC", "FeatureD", "FeatureE"])

df3 = sc.parallelize([("U2", 1)]).toDF(["Key", "FeatureF"])

dfs = [df1, df2, df3]

接下来我们可以提取通用模式:

output_schema = StructType(
[df1.schema.fields[0]] + list(chain(*[df.schema.fields[1:] for df in dfs]))
)

并转换所有DataFrames:

transformed_dfs = [df.select(*[
lit(None).cast(c.dataType).alias(c.name) if c.name not in df.columns
else col(c.name)
for c in output_schema.fields
]) for df in dfs]

最后是联合和虚拟聚合:

combined = reduce(DataFrame.unionAll, transformed_dfs)
exprs = [max(c).alias(c) for c in combined.columns[1:]]
result = combined.repartition(col("Key")).groupBy(col("Key")).agg(*exprs)

如果每个键有多于一行,但个别列仍然是原子的,您可以尝试将 max 替换为 collect_list/collect_set 然后通过爆炸

关于python - 在 pyspark 中累积数据帧的最有效方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35373082/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com