gpt4 book ai didi

具有启动方法 'spawn' 的 Python 多处理不起作用

转载 作者:行者123 更新时间:2023-12-05 01:40:03 25 4
gpt4 key购买 nike

我编写了一个 Python 类来并行绘制 pylots。它在默认启动方法是 fork 的 Linux 上运行良好,但当我在 Windows 上尝试时遇到问题(可以使用 spawn 启动方法在 Linux 上重现 - 请参见下面的代码)。我总是得到这个错误:

Traceback (most recent call last):
File "test.py", line 50, in <module>
test()
File "test.py", line 7, in test
asyncPlotter.saveLinePlotVec3("test")
File "test.py", line 41, in saveLinePlotVec3
args=(test, ))
File "test.py", line 34, in process
p.start()
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 89, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle weakref objects

C:\Python\MonteCarloTools>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 82, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect

我希望有一种方法可以使这段代码适用于 Windows。这里是 Linux 和 Windows 上可用的不同启动方法的链接:https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

import multiprocessing as mp
def test():

manager = mp.Manager()
asyncPlotter = AsyncPlotter(manager.Value('i', 0))

asyncPlotter.saveLinePlotVec3("test")
asyncPlotter.saveLinePlotVec3("test")

asyncPlotter.join()


class AsyncPlotter():

def __init__(self, nc, processes=mp.cpu_count()):

self.nc = nc
self.pids = []
self.processes = processes


def linePlotVec3(self, nc, processes, test):

self.waitOnPool(nc, processes)

print(test)

nc.value -= 1


def waitOnPool(self, nc, processes):

while nc.value >= processes:
time.sleep(0.1)
nc.value += 1


def process(self, target, args):

ctx = mp.get_context('spawn')
p = ctx.Process(target=target, args=args)
p.start()
self.pids.append(p)


def saveLinePlotVec3(self, test):

self.process(target=self.linePlotVec3,
args=(self.nc, self.processes, test))


def join(self):
for p in self.pids:
p.join()


if __name__=='__main__':
test()

最佳答案

当使用 spawn 启动方法时,Process 对象本身会被 pickle 以用于子进程。在您的代码中,target=target 参数是 AsyncPlotter 的绑定(bind)方法。看起来整个 asyncPlotter 实例也必须被 pickle 才能工作,其中包括 self.manager,它显然不想被 pickle。

简而言之,将 Manager 置于 AsyncPlotter 之外。这适用于我的 macOS 系统:

def test():
manager = mp.Manager()
asyncPlotter = AsyncPlotter(manager.Value('i', 0))
...

此外,正如您的评论中所述,asyncPlotter 在重用时不起作用。我不知道细节,但看起来它与 Value 对象如何跨进程共享有关。 test 函数需要像这样:

def test():
manager = mp.Manager()
nc = manager.Value('i', 0)

asyncPlotter1 = AsyncPlotter(nc)
asyncPlotter1.saveLinePlotVec3("test 1")
asyncPlotter2 = AsyncPlotter(nc)
asyncPlotter2.saveLinePlotVec3("test 2")

asyncPlotter1.join()
asyncPlotter2.join()

总而言之,您可能想要重组代码并使用 a process pool .它已经处理了 AsyncPlotter 使用 cpu_count 和并行执行所做的事情:

from multiprocessing import Pool, set_start_method
from random import random
import time

def linePlotVec3(test):
time.sleep(random())
print("test", test)

if __name__ == "__main__":
set_start_method("spawn")
with Pool() as pool:
pool.map(linePlotVec3, range(20))

或者您可以使用 ProcessPoolExecutorpretty much the same thing .此示例一次启动一个任务,而不是映射到列表:

from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
import time
from random import random

def work(i):
r = random()
print("work", i, r)
time.sleep(r)

def main():
ctx = mp.get_context("spawn")
with ProcessPoolExecutor(mp_context=ctx) as pool:
for i in range(20):
pool.submit(work, i)

if __name__ == "__main__":
main()

关于具有启动方法 'spawn' 的 Python 多处理不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57191393/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com