gpt4 book ai didi

python 使用 pool.map_async 时没有输出

转载 作者:行者123 更新时间:2023-11-30 23:20:50 24 4
gpt4 key购买 nike

在处理由 pool.map 调用的函数内的数据时,我遇到了非常奇怪的问题。例如,以下代码按预期工作...

import csv
import multiprocessing
import itertools
from collections import deque

cur_best = 0
d_sol = deque(maxlen=9)
d_names = deque(maxlen=9)

**import CSV Data1**

def calculate(vals):
#global cur_best
sol = sum(int(x[2]) for x in vals)
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(sol))

def process():
pool = multiprocessing.Pool(processes=4)
prod = itertools.product(([x[2], x[4], x[10]] for x in Data1))
result = pool.map_async(calculate, prod)
pool.close()
pool.join()
return result

process()

现在,当我向calculate() 添加一个简单的if 语句时,我没有得到任何输出。

   def calculate(vals):
#global cur_best
sol = sum(int(x[2]) for x in vals)
if sol > cur_best:
cur_best = sol
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(cur_best))
#would like to append cur_best and names to a deque

我尝试调整声明“cur_best”的位置,但无济于事。

在进行计算时,我试图跟踪“当前最佳”解决方案。在我的线性代码中,此逻辑驻留在嵌套的 for 循环中,并将每个新的“cur_best”附加到双端队列中。

我的新问题与 pool.map 或 pool.map_async 的工作方式有关吗?我可以不再将calculate()函数视为线性循环吗?

我还需要解决其他几个条件语句。我应该在代码的不同部分处理这个问题吗?如果是这样,具体是怎样的?

最佳答案

这里可能发生了两件事。首先,您没有看到工作函数打印任何内容的原因可能是因为它抛出了异常。由于您使用的是 map_async,因此在调用 result.get() 之前您实际上不会看到异常。但是,由于您在使用 map_async 之后立即在池上调用 close/join,因此您可能应该只使用 map 相反,它将阻塞,直到所有工作完成(或抛出异常)。我不确定为什么发生异常(您提供的代码中没有任何内容跳出),但我的猜测是您从列表中的某处提取了错误的索引。

其次,正如 Armin Rigo 指出的那样,cur_best 并非在所有进程之间共享,因此您的逻辑将无法按照您的预期方式工作。我认为最简单的选择是使用 multiprocessing.Value在共享内存中创建一个整数,所有进程都可以访问该整数。

要将获得的结果附加到双端队列,您需要使用multiprocessing.Manager创建共享双端队列。 。 Manager 生成一个服务器进程,可以管理对对象的共享访问(如 deque)。池中的每个进程(以及父进程)都可以访问 Proxy对象,它可以与Manager的进程通信以读取/写入共享对象。

这是一个显示上面讨论的所有内容的示例:

import itertools
import multiprocessing
from collections import deque
from multiprocessing.managers import BaseManager, MakeProxyType

class DequeManager(BaseManager):
pass

BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
'__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
'__mul__', '__reversed__', '__rmul__', '__setitem__',
'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop',
'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate',
'__imul__'
))
class DequeProxy(BaseDequeProxy):
def __iadd__(self, value):
self._callmethod('extend', (value,))
return self
def __imul__(self, value):
self._callmethod('__imul__', (value,))
return self

DequeManager.register('deque', deque, DequeProxy)


cur_best = d_sol = d_names = None

def init_globals(best, sol, names):
""" This will be called in each worker process.

A global variable (cur_best) will be created in each worker.
Because it is a multiprocessing.Value, it will be shared
between each worker, too.

"""
global cur_best, d_sol, d_names
cur_best = best
d_sol = sol
d_names = names

def calculate(vals):
global cur_best
sol = sum(int(x[2]) for x in vals)
if sol > cur_best.value:
cur_best.value = sol
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(cur_best.value))
d_sol.append(cur_best.value)
d_names.append(names)
return sol

def process():
global d_sol, d_names
cur_best = multiprocessing.Value("I", 0) # unsigned int

m = DequeManager()
m.start()
d_sol = m.deque(maxlen=9)
d_names = m.deque(maxlen=9)

pool = multiprocessing.Pool(processes=4, initializer=init_globals,
initargs=(cur_best, d_sol, d_names))
prod = itertools.product([x[2], x[4], x[10]] for x in Data1)
result = pool.map(calculate, prod) # map instead of map_async
pool.close()
pool.join()
return result # Result will be a list containing the value of `sol` returned from each worker call

if __name__ == "__main__":
print(process())

关于python 使用 pool.map_async 时没有输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25226376/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com