gpt4 book ai didi

python - 多处理:将类实例传递给 pool.map

转载 作者:太空宇宙 更新时间:2023-11-03 16:38:02 34 4
gpt4 key购买 nike

我发誓我在某个地方的示例中看到了以下内容,但现在我找不到该示例并且这不起作用。 __call__ 类函数永远不会被调用。

编辑:代码已更新

pool.map 似乎启动了 QueueWriter 实例并到达 __call__ 函数。然而,工作人员似乎从未启动,或者至少没有从队列中提取任何结果。我的队列设置正确吗? worker 们为什么不开火?

import multiprocessing as mp
import os
import random

class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")

def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
self.log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
self.log.write("This is where I'd write: {0} to grid file.\n".format(res))

log.close()
log = None

class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")

def __call__(self, idx):
res = self.workhorse(self, idx)
self.queue.put((idx,res))
return res

def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')

pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()

grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter

qWriter = QueueWriter(grid=grid, path=path)
watcher = pool.map(qWriter, (q,),1)
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
result.get()
q.put('kill')
pool.close()
pool.join()

所以日志确实打印了初始化消息,但是 __call__ 函数永远不会被调用。这是我经常看到讨论的酸洗问题之一吗?我找到了有关类成员函数的答案,但是类实例呢?

最佳答案

martineau 温柔而耐心的催促下(谢谢!)我想我已经解决了问题。我还没有将它应用到我的原始代码中,但它在上面的示例中工作,我将为 future 的实现问题提出新的问题。

因此,除了更改代码中打开目标文件(本例中为日志)的位置之外,我还将 QueueWriter 实例作为单个多处理进程启动,而不是使用 pool.map 。如martineau指出映射调用会阻塞,直到 qWriter.__call__() 返回,这会阻止调用工作人员。

上面的代码中还有一些其他错误,但这些错误是偶然的,并在下面修复:

import multiprocessing as mp
import os
import random

class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")

def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
log.write("This is where I'd write: {0} to grid file.\n".format(res))

log.close()
log = None

class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")

def __call__(self, idx):
res = self.workhorse(idx)
self.queue.put((idx,res))
return res

def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')

pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()

grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter

qWriter = QueueWriter(grid=grid, path=path)
# watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
p = mp.Process(target=qWriter, args=(q,))
p.start()
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
# result.get()
# not required for pool
q.put('kill')
pool.close()
p.join()
pool.join()

关于python - 多处理:将类实例传递给 pool.map,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37055294/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com