gpt4 book ai didi

Python 多处理 apply_async + Value

转载 作者:行者123 更新时间:2023-12-01 05:51:36 30 4
gpt4 key购买 nike

我尝试通过 apply_async 将共享计数器传递给多处理中的任务,但它失败并出现以下错误“RuntimeError:同步对象只能通过继承在进程之间共享”。这是怎么回事

def processLine(lines, counter, mutex):
pass

counter = multiprocessing.Value('i', 0)
mutex = multiprocessing.Lock()
pool = Pool(processes = 8)
lines = []

for line in inputStream:
lines.append(line)
if len(lines) >= 5000:
#don't queue more than 1'000'000 lines
while counter.value > 1000000:
time.sleep(0.05)
mutex.acquire()
counter.value += len(lines)
mutex.release()
pool.apply_async(processLine, args=(lines, counter, ), callback = collectResults)
lines = []

最佳答案

让池处理调度:

for result in pool.imap(process_single_line, input_stream):
pass

如果顺序不重要:

for result in pool.imap_unordered(process_single_line, input_stream):
pass

pool.*map*() 函数有 chunksize 参数,您可以更改该参数以查看它是否会影响您的情况下的性能。

如果您的代码需要在一次调用中传递多行:

from itertools import izip_longest

chunks = izip_longest(*[iter(inputStream)]*5000, fillvalue='') # grouper recipe
for result in pool.imap(process_lines, chunks):
pass

限制排队项目数量的一些替代方法是:

  • multiprocessing.Queue 设置最大大小(在这种情况下,您不需要池)。 queue.put() 将在达到最大大小时阻塞,直到其他进程调用 queue.get()
  • 使用多处理原语(例如 Condition 或 BoundedSemaphor)手动实现生产者/消费者模式。

注意:每个值都有关联的锁,您不需要单独的锁。

关于Python 多处理 apply_async + Value,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14055171/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com