gpt4 book ai didi

python - 添加额外的随机参数作为 python 3.4.7 中 pool.map 函数的参数

转载 作者:行者123 更新时间:2023-12-01 06:58:00 35 4
gpt4 key购买 nike

我想在大型数据集上使用多重处理来查找两列的乘积,并使用参数中的给定参数过滤数据集。我构建了一个测试集,但我无法让多重处理在该集上工作。

首先,我尝试在parallelize_dataframe函数中划分数据集,然后在subset_col函数中应用乘法函数和过滤函数。稍后我将完整的数据集附加回parallelize_dataframe。

import numpy as np
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Lock

df = pd.DataFrame({'col1': [1, 0, 1, 1, 1, 0, 0, 1, 0, 1],
'col2': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'col3': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'col4': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def subset_col(df, p):
print("Working with number: " + str(p))
df[col5] = df[col3]*df[col4]
df= df[df['col1'] == p]


def parallelize_dataframe(df, p, func, n_cores=80):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split, p))
pool.close()
pool.join()
return df


df3 = parallelize_dataframe(df,1,subset_col)


结果应该是 col3 和 col4 的乘积,其中 col1 用值过滤。但我总是收到错误消息:

File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in parallelize_dataframe
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

但是,如果我从所有函数中删除过滤器“p”,它就完全可以正常工作。有人可以帮我调试这个吗?

最佳答案

来自 multiprocessing.Pool.map 的官方文档,它“仅支持一个可迭代参数”。因此,您需要更改 subset_col 的接口(interface)以采用单个参数。此外,您忘记创建列字符串,从而导致名称错误。为了减少计算量,应该在乘法之前进行过滤。然后应该返回一个值,除非您的函数仅通过副作用运行(我假设您不希望这样做,因为您连接了池结果)。

def subset_col(pair):
df, p = pair
print("Working with number: " + str(p))
df = df[df['col1'] == p].copy()
df['col5'] = df['col3']
return df

接下来,我们需要修复您调用 pool.map 的方式,因为根据您正在执行的操作,它应该只需要 2 个参数(第三个,最后一个参数是 chunksize)。由于您希望每个进程使用相同的 p,因此我们将使用 p 的重复值将 dfs 压缩在一起。另外,请考虑使用上下文管理器来处理关闭资源。

def parallelize_dataframe(df, p, func, n_cores=None):
if n_cores is None:
n_cores = os.cpu_count()

dfs = np.array_split(df, n_cores)
pairs = zip(dfs, itertools.repeat(p))
with Pool(n_cores) as pool:
result = pool.map(func, pairs)

df = pd.concat(result)
return df

现在可以正确返回新的数据帧。但我对你是否拥有 80 核的机器表示怀疑。考虑实现 n_cores=NonePython dynamically figure out使用 os.cpu_count

您的计算机上有多少个核心
df3 = parallelize_dataframe(df, 1, subset_col)

根据您对 Pool.starmap 变体的请求:

def subset_col(df, p):
# remove unpacking line
...

def parallelize_dataframe(df, p, func, n_cores=None):
...
# change `pool.map(...)` to `pool.starmap(...)`
...

但是您应该注意,Pool 不提供 imapimap_unordered 替代 starmap,它们是两个惰性求值版本在是否保留顺序方面有所不同。

关于python - 添加额外的随机参数作为 python 3.4.7 中 pool.map 函数的参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58724956/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com