gpt4 book ai didi

python - 多处理:如何 mp.map 将元素存储在列表中的函数?

转载 作者:太空宇宙 更新时间:2023-11-04 09:30:25 25 4
gpt4 key购买 nike

我有一个类似于下面的程序:

import time
from multiprocessing import Pool

class a_system():
def __init__(self,N):
self.N = N
self.L = [0 for _ in range(self.N)]
def comp(self,n):
self.L[n] = 1
return self.L
def reset(self):
self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
global B, L, sys
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B

def simulate(N_mc):
global B, L, sys
L = [[] for _ in range(N_mc)]
B = 0
sys = a_system(N_mc)
[*map(individual_sim, range(N_mc))]
# with Pool() as P:
# P.map(individual_sim,range(N_mc))
return L, B

if __name__=="__main__":
start = time.time()
L, B = simulate(N_mc=5)
print(L)
print(B)
print("Time elapsed: ",time.time()-start)

在这里,我想将 [*map(individual_sim, range(N_mc))] 行与多处理并行化。但是,将此行替换为

with Pool() as P:
P.map(individual_sim,range(N_mc))

返回一个空列表。

如果我改为使用 P.map_asyncP.imapP.imap_unordered,我不会收到错误,但是列表和 B 留空。

我如何并行化这段代码?

附言我试过 multiprocessing.pool 中的 ThreadPool,但我想避免这种情况,因为类 a_system 有点复杂此处显示的副本需要为每个工作人员准备一份不同的副本(我得到一个退出代码 139(被信号 11 中断:SIGSEGV))。

附言2我可能会尝试使用 sharedctypes 或 Managers (?),但我不确定它们是如何工作的,也不确定我应该使用哪一个(或组合使用?)。

附注3我还尝试将 individual_sim 修改为

def individual_sim(iter,B,L,sys):
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B

并在模拟中使用以下内容:

   from functools import partial
part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
with Pool() as P:
P.map(part_individual_sim,range(N_mc))

但我仍然得到空列表。

最佳答案

我不太清楚这里的业务逻辑是什么,但是您不能从子进程中修改父进程中的全局变量。单独的进程不共享它们的地址空间。

您可以将 L 设为 Manager.List 并将 B 设为 Manager.Value 以从您的但是,工作进程。管理器对象存在于单独的服务器进程中,您可以使用代理对象修改它们。此外,您需要在修改这些共享对象时使用 Manager.Lock 以防止数据损坏。

这是一个精简的示例,可以帮助您入门:

import time
from multiprocessing import Pool, Manager


def individual_sim(mlist, mvalue, mlock, idx):
# in your real computation, make sure to not hold the lock longer than
# really needed (e.g. calculations without holding lock)
with mlock:
mlist[idx] += 10
mvalue.value += sum(mlist)


def simulate(n_workers, n):

with Manager() as m:
mlist = m.list([i for i in range(n)])
print(mlist)
mvalue = m.Value('i', 0)
mlock = m.Lock()

iterable = [(mlist, mvalue, mlock, i) for i in range(n)]

with Pool(processes=n_workers) as pool:
pool.starmap(individual_sim, iterable)

# convert to non-shared objects before terminating manager
mlist = list(mlist)
mvalue = mvalue.value

return mlist, mvalue


if __name__=="__main__":

N_WORKERS = 4
N = 20

start = time.perf_counter()
L, B = simulate(N_WORKERS, N)
print(L)
print(B)
print("Time elapsed: ",time.perf_counter() - start)

示例输出:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed: 0.14064819699706277

Process finished with exit code 0

也可以使用 Pool 的 initializer 参数在 worker 初始化时传递代理并将它们注册为全局变量,而不是将它们作为 starmap 调用的常规参数发送。

关于 Manager 用法的更多信息(相关:嵌套代理)我写了 here .

关于python - 多处理:如何 mp.map 将元素存储在列表中的函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55982600/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com