gpt4 book ai didi

python - 是否可以手动锁定/解锁队列?

转载 作者:太空宇宙 更新时间:2023-11-04 03:53:53 25 4
gpt4 key购买 nike

我很好奇是否有办法手动锁定 multiprocessing.Queue 对象。

我有一个非常标准的生产者/消费者模式设置,其中我的主线程不断产生一系列值,multiprocessing.Process 工作池对产生的值进行操作。

它全部由一个单独的 multiprocessing.Queue() 控制。

import time
import multiprocessing


class Reader(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
while True:
item = self.queue.get()
if isinstance(item, str):
break


if __name__ == '__main__':

queue = multiprocessing.Queue()
reader = Reader(queue)
reader.start()


start_time = time.time()
while time.time() - start_time < 10:
queue.put(1)
queue.put('bla bla bla sentinal')
queue.join()

我遇到的问题是,我的工作线程池无法像主线程向其中插入值那样快地消耗和处理 queue。因此,一段时间后,Queue 变得笨拙,以至于弹出 MemoryError。

一个明显的解决方案是简单地在生产者中添加一个等待检查,以阻止它向队列中放入更多的值。类似的东西:

while time.time() - start_time < 10: 
queue.put(1)
while queue.qsize() > some_size:
time.sleep(.1)
queue.put('bla bla bla sentinal')
queue.join()

但是,由于该程序的时髦性质,我想将队列中的所有内容转储到文件中以供以后处理。但!如果不能暂时锁定队列,工作人员就无法消费队列中的所有内容,因为生产者不断地用垃圾填满它——无论如何在概念上都是如此。经过无数次测试后,似乎在某个时候其中一把锁获胜(但通常是添加到队列中的那把锁)。

编辑:此外,我意识到可以简单地停止生产者并从该线程使用它......但这让我内心的单一责任人感到难过,因为生产者是生产者,而不是消费者.

编辑:

在查看了Queue 的源码后,我想到了这个:

def dump_queue(q):
q._rlock.acquire()
try:
res = []
while not q.empty():
res.append(q._recv())
q._sem.release()
return res
finally:
q._rlock.release()

但是,我太害怕使用它了!我不知道这是否“正确”。我没有足够的把握知道这是否会在不破坏任何 Queue 内部结构的情况下保持下去。

有人知道这会不会坏吗? :)

最佳答案

鉴于评论中所说的内容,Queue 只是针对您的问题的错误数据结构 - 但很可能是可用解决方案的一部分

听起来您只有一个制作人。创建一个新的本地生产者(跨进程共享)类来实现您真正需要的语义。例如,

class FlushingQueue:
def __init__(self, mpqueue, path_to_spill_file, maxsize=1000, dumpsize=1000000):
from collections import deque
self.q = mpqueue # a shared `multiprocessing.Queue`
self.dump_path = path_to_spill_file
self.maxsize = maxsize
self.dumpsize = dumpsize
self.d = deque() # buffer for overflowing values

def put(self, item):
if self.q.qsize() < self.maxsize:
self.q.put(item)
# in case consumers have made real progress
while self.d and self.q.qsize() < self.maxsize:
self.q.put(self.d.popleft())
else:
self.d.append(item)
if len(self.d) >= self.dumpsize:
self.dump()

def dump(self):
# code to flush self.d to the spill file; no
# need to look at self.q at all

我敢打赌你可以完成这项工作:-)

关于python - 是否可以手动锁定/解锁队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19780750/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com