gpt4 book ai didi

python - Python 中的流水线生成器

转载 作者:行者123 更新时间:2023-11-28 19:19:50 25 4
gpt4 key购买 nike

我有一个 Python 数据生成器和一组我想对该数据执行的昂贵操作。粗略地说,对于每个数据,我想执行 d(g(h(q(x)))) 其中 x 是数据。我想通过使用处理管道部分隐藏执行这些操作的代码。

另一种思考问题的方法是,在每个阶段,我都希望一组工作人员通过队列读取上一个阶段的结果,进行处理,然后将结果放入另一个队列。

我目前的解决方案(有效)是:

from multiprocessing.pool import ThreadPool 

class FuncIterator(object):
def __init__(self, func, base_iterator, pool_size=10):

self.func = func
self.base_iterator = base_iterator

self.pool = ThreadPool(pool_size)

def __iter__(self):
aa = self.pool.imap(self.func, self.base_iterator, chunksize=1)

for item in aa:
yield item

这个解决方案的问题是队列是无界的;也就是说,生产者可以任意领先于消费者,这可能会导致无限制的内存使用。我想限制中间队列的大小以防止这种情况发生。

我的第一个想法是使用显式Queue:

from multiprocessing.pool import Queue

def get_queue(func, f_iter, maxsize=5):
queue = Queue.Queue(maxsize=maxsize)

def runner(source):
for entry in source:
queue.put(func(entry), True)
queue.put(StopIteration)

process = ThreadPool.Process(target=runner, args=(f_iter,))
process.start()
return queue

但是我该如何控制使用了多少个 worker?

最佳答案

这是我想出的解决方案。它采用我原来的解决方案,使用无界队列问题并使用 Semaphore:

def _do_sem2(sem, x):
sem.acquire()
return x

class FuncIterator(object):
def __init__(self, func, base_iterator, pool_size=10, queue_size=10):

self.func = func
self.base_iterator = base_iterator

self.pool = ThreadPool(pool_size)
self.sem = BoundedSemaphore(queue_size)

def __iter__(self):
aa = self.pool.imap(self.func, (_do_sem2(self.sem, x) for x in self.base_iterator), chunksize=1)

for item in aa:
self.sem.release()
yield item

关于python - Python 中的流水线生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28071946/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com