gpt4 book ai didi

python-3.x - python的multiprocessing.Queue的可迭代实现中的继承

转载 作者:行者123 更新时间:2023-12-02 18:49:57 28 4
gpt4 key购买 nike

我发现 python 的 multiprocessing.Queue 缺少默认实现,因为它不像任何其他集合那样可迭代。因此,我开始努力创建它的“子类”,添加该功能。正如您从下面的代码中看到的,它不是一个正确的子类,因为 multiprocess.Queue 不是一个direct类本身,而是一个工厂函数,真正的底层类是multiprocess.queues.Queue。我没有理解也没有精力去模仿工厂函数,这样我就可以正确地从类继承,所以我只是让新类从工厂创建它自己的实例并将其视为父类(super class)。这是代码;

from multiprocessing import Queue, Value, Lock
import queue

class QueueClosed(Exception):
pass

class IterableQueue:
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)

def close(self):
with self.close_lock:
self.closed.value = True
self.queue.close()

def put(self, elem, block=True, timeout=None):
with self.close_lock:
if self.closed.value:
raise QueueClosed()
else:
self.queue.put(elem, block, timeout)

def put_nowait(self, elem):
self.put(elem, False)

def get(self, block=True):
if not block:
return self.queue.get_nowait()
elif self.closed.value:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
else:
val = None
while not self.closed.value:
try:
val = self.queue.get_nowait()
break
except queue.Empty:
pass
return val

def get_nowait(self):
return self.queue.get_nowait()

def join_thread(self):
return self.queue.join_thread()

def __iter__(self):
return self

def __next__(self):
val = self.get()
if val == None:
raise StopIteration()
else:
return val

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

这允许我实例化一个 IterableQueue 对象,就像普通的 multiprocessing.Queue 一样,像平常一样将元素放入其中,然后在子消费者内部,简单地循环它像这样;

from iterable_queue import IterableQueue
from multiprocessing import Process, cpu_count
import os

def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)

def consumer(queue):
print(f"[{os.getpid()}] Consuming")
for i in queue:
print(f"[{os.getpid()}] < {i}")
n = fib(i)
print(f"[{os.getpid()}] {i} > {n}")
print(f"[{os.getpid()}] Closing")

def producer():
print("Enqueueing")
with IterableQueue() as queue:
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
[p.start() for p in procs]
[queue.put(i) for i in range(36)]
print("Finished")

if __name__ == "__main__":
producer()

而且它的工作几乎无缝;一旦队列关闭,消费者就会退出循环,但只有在耗尽所有剩余元素之后。然而,我对缺乏继承方法感到不满意。为了模仿实际的继承行为,我尝试向类添加以下元函数调用;

def __getattr__(self, name):
if name in self.__dict__:
return self.__dict__[name]
else:
return self.queue.__getattr__[name]

但是,当在子 multiprocessing.Process 线程内操作 IterableQueue 类的实例时,此操作会失败,因为该类的 __dict__ 属性不是保存在其中。我试图通过用 multiprocessing.Manager().dict() 替换类的默认 __dict__ 来以一种 hacky 的方式来解决这个问题,如下所示;

def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
self.__dict__ = Manager().dict(self.__dict__)

但是,这样做时,我收到一条错误,指出RuntimeError:同步对象只能通过继承在进程之间共享。所以我的问题是,我应该如何正确地从 Queue 类继承,以便子类继承对其所有属性的访问权限?此外,当队列为空但未关闭时,消费者都处于繁忙循环中,而不是真正的 IO block 中,从而占用宝贵的 cpu 资源。如果您对我在这段代码中可能遇到的并发和竞争条件问题有任何建议,或者我如何解决繁忙循环问题,我也愿意接受其中的建议。

<小时/>

根据MisterMiyagi提供的代码,我创建了这个通用的IterableQueue类,它可以接受任意输入,正确阻塞,并且不会卡在队列关闭上;

from multiprocessing.queues import Queue
from multiprocessing import get_context

class QueueClosed(Exception):
pass

class IterableQueue(Queue):
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)

def close(self):
super().put((None, False))
super().close()

def __iter__(self):
return self

def __next__(self):
try:
return self.get()
except QueueClosed:
raise StopIteration

def get(self, *args, **kwargs):
result, is_open = super().get(*args, **kwargs)
if not is_open:
super().put((None, False))
raise QueueClosed
return result

def put(self, val, *args, **kwargs):
super().put((val, True), *args, **kwargs)

def __enter__(self):
return self

def __exit__(self, *args):
self.close()

最佳答案

multiprocess.Queue 包装器仅用于 use the default context .

def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())

继承时,您可以在__init__方法中复制它。这允许您继承整个Queue行为。您只需要添加迭代器方法:

from multiprocessing.queues import Queue
from multiprocessing import get_context


class IterableQueue(Queue):
"""
``multiprocessing.Queue`` that can be iterated to ``get`` values

:param sentinel: signal that no more items will be received
"""
def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
self.sentinel = sentinel
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)

def close(self):
self.put(self.sentinel)
# wait until buffer is flushed...
while self._buffer:
time.sleep(0.01)
# before shutting down the sender
super().close()

def __iter__(self):
return self

def __next__(self):
result = self.get()
if result == self.sentinel:
# re-queue sentinel for other listeners
self.put(result)
raise StopIteration
return result

请注意,指示队列末尾的sentinel 是通过相等性进行比较的,因为身份不会跨进程保留。常用的 queue.Queue 哨兵 object() 无法正常工作。

关于python-3.x - python的multiprocessing.Queue的可迭代实现中的继承,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57207887/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com