gpt4 book ai didi

python - 如何使用 PySpark 进行嵌套的 for-each 循环

转载 作者:太空狗 更新时间:2023-10-30 01:32:44 25 4
gpt4 key购买 nike

想象一个大型数据集(>40GB parquet 文件),其中包含对数千个变量的观察值作为三元组(变量、时间戳、值)

现在考虑一个您只对 500 个变量的子集感兴趣的查询。并且您想检索特定时间点(观察窗口或时间范围)的那些变量的观察结果(值 --> 时间序列)。这样有一个开始和结束时间。

如果没有分布式计算 (Spark),您可以这样编写代码:

for var_ in variables_of_interest:
for incident in incidents:

var_df = df_all.filter(
(df.Variable == var_)
& (df.Time > incident.startTime)
& (df.Time < incident.endTime))

我的问题是:如何使用 Spark/PySpark 做到这一点?我在想:

  1. 以某种方式将事件与变量结合起来,然后过滤数据框。
  2. 广播事件数据帧,并在过滤变量观测值 (df_all) 时在 map 函数中使用它。
  3. 以某种方式使用 RDD.cartasian 或 RDD.mapParitions(备注:parquet 文件按变量分区保存)。

预期的输出应该是:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

其中数据帧 1 包含事件 1 时间范围内的所有变量及其观测值,数据帧 2 包含事件 2 时间范围内的所有变量及其观测值。

我希望你明白了。

更新

我尝试根据想法 #1 和 zero323 给出的答案中的代码编写解决方案。工作很好,但我想知道如何在最后一步将其汇总/分组到事件中?我尝试为每个事件添加一个序列号,但在最后一步出现错误。如果您可以查看和/或完成代码,那就太好了。因此我上传了示例数据和脚本。环境为Spark 1.4(PySpark):

最佳答案

一般来说,只有第一种方法对我来说是明智的。关于记录数量和分布的精确加入策略,但您可以创建顶级数据框:

ref = sc.parallelize([(var_, incident) 
for var_ in variables_of_interest:
for incident in incidents
]).toDF(["var_", "incident"])

然后简单地加入

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
col("incident.startTime"),
col("incident.endTime")
)

ref.join(df.alias("df"), same_var & same_time)

或针对特定分区执行连接:

incidents_ = sc.parallelize([
(incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
df = spark.read.parquet("/some/path/Variable={0}".format(var_))
df.join(incidents_, same_time)

可选 marking one side as small enough to be broadcasted .

关于python - 如何使用 PySpark 进行嵌套的 for-each 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39155954/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com