gpt4 book ai didi

multiprocessing - 为什么在 Python3.8+ "fork"中使用 "spawn"有效但使用 `multiprocessing` 失败?

转载 作者:行者123 更新时间:2023-12-04 08:29:26 27 4
gpt4 key购买 nike

我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失败。该代码的目的是创建一个支持在 macOS 下调用 size() 的自定义队列对象,因此继承了 Queue 对象并获取多处理的上下文。

import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep


class Q(Queue):
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = 1

def call(self):
return print(self.size)


def foo(q):
q.call()


if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # this would fail
# multiprocessing.set_start_method('fork') # this would succeed
q = Q()
p = Process(target=foo, args=(q,))
p.start()
p.join(timeout=1)

使用“spawn”时输出的错误消息如下所示。

Process Process-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
q.call()
File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
return print(self.size)
AttributeError: 'Q' object has no attribute 'size'

看来子进程认为self.size不是代码执行所必需的,所以没有复制。我的问题是为什么会发生这种情况?

在 macOS Catalina 10.15.6、Python 3.8.5 下测试的代码片段

最佳答案

问题是生成的进程没有共享资源,因此要为每个进程正确地重新创建队列实例,您需要添加序列化和反序列化方法。这是一个工作代码:

# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.

import multiprocessing
import multiprocessing.queues

class SharedCounter(object):
""" A synchronized shared counter.

The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.

This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

"""

def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)

def __getstate__(self):
return (self.count,)

def __setstate__(self, state):
(self.count,) = state

def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n

@property
def value(self):
""" Return the value of the counter """
return self.count.value

class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.

Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().

"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self._counter = SharedCounter(0)

def __getstate__(self):
return super().__getstate__() + (self._counter,)

def __setstate__(self, state):
super().__setstate__(state[:-1])
self._counter = state[-1]

def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self._counter.increment(1)

def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self._counter.increment(-1)
return item

def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self._counter.value

def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()

关于multiprocessing - 为什么在 Python3.8+ "fork"中使用 "spawn"有效但使用 `multiprocessing` 失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65098398/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com