gpt4 book ai didi

python - 我是否需要将 multiprocessing.Queue 实例变量显式传递给在实例方法上执行的子进程?

转载 作者:太空狗 更新时间:2023-10-29 18:06:34 25 4
gpt4 key购买 nike

关于使用 Python 的 multiprocessing 模块,我有几个基本问​​题:

class Someparallelworkerclass(object) :

def __init__(self):
self.num_workers = 4
self.work_queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.JoinableQueue()

def someparallellazymethod(self):
p = multiprocessing.Process(target=self.worktobedone).start()

def worktobedone(self):
# get data from work_queue
# put back result in result queue

是否需要将work_queueresult_queue作为args传递给Process?答案取决于操作系统吗?更基本的问题是:子进程是否从父进程获得复制的(COW)地址空间,从而知道类/类方法的定义?如果是,它如何知道要为 IPC 共享队列,以及它不应该在子进程中复制 work_queueresult_queue?我尝试在线搜索此内容,但我发现的大部分文档都含糊不清,并且没有深入了解到底发生了什么。

最佳答案

在这种情况下,实际上没有必要在 args 参数中包含队列,无论您使用的是什么平台。原因是,即使看起来您没有明确地将两个 JoinableQueue 实例传递给 child ,您实际上是通过 self。因为 self 明确地传递给了 child ,而这两个队列是 self 的一部分,所以它们最终被传递给了 child .

在 Linux 上,这通过 os.fork() 发生,这意味着 multiprocessing.connection.Connection 对象使用的文件描述符 Queue 内部用于进程间通信的是子进程继承(不是复制)。 Queue 的其他部分变为copy-on-write,但这没关系; multiprocessing.Queue 的设计使得需要复制的片段实际上都不需要在两个进程之间保持同步。事实上,许多内部属性在 fork 发生后被重置:

def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send = self._writer.send # _writer is a
self._recv = self._reader.recv
self._poll = self._reader.poll

所以这涵盖了 Linux。 Windows 怎么样? Windows 没有 fork,因此它需要 pickle self 才能将其发送给子进程,这包括 pickle 我们的 Queues。现在,如果您尝试 pickle multiprocessing.Queue,通常会失败:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
dict = getstate()
File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

但这其实是人为的限制。 multiprocessing.Queue 对象可以在某些情况下被 pickle - 否则它们如何被发送到 Windows 中的子进程?事实上,如果我们查看实现,我们可以看到:

def __getstate__(self):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)

def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()

__getstate__,在 pickle 一个实例时被调用,其中有一个 assert_spawning 调用,这确保我们在尝试 pickle* 时实际上是在生成一个进程。 __setstate__,unpickling时调用,负责调用_after_fork

那么当我们必须 pickle 时,队列使用的 Connection 对象是如何维护的呢?原来有一个 multiprocessing 子模块可以做到这一点 - multiprocessing.reduction。模块顶部的注释非常清楚地说明了这一点:

#
# Module to allow connection and socket objects to be transferred
# between processes
#

在 Windows 上,该模块最终使用 DuplicateHandle Windows 提供的 API 用于创建子进程的 Connection 对象可以使用的重复句柄。因此,虽然每个进程都有自己的句柄,但它们是完全重复的——对一个进程执行的任何操作都会反射(reflect)在另一个进程上:

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles. For example, if you duplicate a file handle, the current file position is always the same for both handles.

* 参见 this answer有关 assert_spawning

的更多信息

关于python - 我是否需要将 multiprocessing.Queue 实例变量显式传递给在实例方法上执行的子进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26225108/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com