gpt4 book ai didi

python - 使用 spark(PySpark) 进行多处理

转载 作者:太空狗 更新时间:2023-10-29 21:45:14 24 4
gpt4 key购买 nike

<分区>

用例如下:

我有一个大数据框,其中有一个“user_id”列(每个 user_id 可以出现在很多行中)。我有一个用户列表 my_users 我需要对其进行分析。

Groupbyfilteraggregate 可能是个好主意,但 pyspark 中包含的可用聚合函数不符合我的需要。在 pyspark ver 中,user defined aggregation functions 仍然没有得到完全支持,我决定暂时保留它..

相反,我只是迭代 my_users 列表,过滤数据框中的每个用户,然后进行分析。为了优化这个过程,我决定为 my_users

中的每个用户使用 python 多处理池

执行分析(并传递给池)的函数有两个参数:user_id 和一个到主数据框的路径,我在其上执行所有操作计算(PARQUET 格式)。在该方法中,我加载数据框并对其进行处理(DataFrame 本身不能作为参数传递)

我在某些进程(每次运行都不同)上遇到各种奇怪的错误,看起来像:

  • JVM 中不存在 PythonUtils(读取“parquet”数据帧时)

print screen of the error message

  • KeyError: 'c' not found(还有,在读取 'parquet' 数据框时。'c' 到底是什么??)

当我在没有任何多处理的情况下运行它时,一切运行顺利,但速度很慢..

知道这些错误是从哪里来的吗?

为了让事情更清楚,我将放一些代码示例:

PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here

def user_worker_wrapper(args):
users_worker(*args)

def analyse():
# ...
users_worker_args = [(df_path, user_id) for user_id in my_users]
users_pool = Pool(processes=len(my_users))
users_pool.map(users_worker_wrapper, users_worker_args)
users_pool.close()
users_pool.join()

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com