gpt4 book ai didi

对象中的 Python 多处理

转载 作者:太空宇宙 更新时间:2023-11-04 03:34:48 24 4
gpt4 key购买 nike

我正在编写一个程序,其中可变数量的 Agent 对象同时运行许多串行方法并将它们的返回值存储在队列属性中。每个 Agent 都有一个 Worker(Process 的子类)作为属性,并通过 cmd_queue 为其提供作业以串行运行。 Agent 从 res_queue 中的 Worker 获取结果。这些当前是 Manager().Queue() 实例并导致:TypeError:出于安全原因,不允许对 AuthenticationString 对象进行 Pickling 但是,如果我使用常规 Queue.Queue,Worker 将获得代理的 cmd_queue 的副本,并且看不到代理添加到其中的内容(它总是空)。

我可以使用这个问题中引用的解决方案来 pickle 实例方法:Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

from multiprocessing import Manager, Process
from time import sleep
import copy_reg

def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method

class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)

def run(self):
while True:
f, args, kwargs = self.cmd_queue.get()
self.res_queue.put( f(*args, **kwargs) )

class Agent:
def __init__(self):
self.cmd_queue = Manager().Queue()
self.res_queue = Manager().Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()

def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))

def do_some_work(self):
self.produce(self.foo, waka='waka')

def do_some_other_work(self):
self.produce(self.bar, humana='humana')

def foo(self, **kwargs):
sleep(5)
return('this is a foo')

def bar(self, **kwargs):
sleep(10)
return('this is a bar')

def get_results(self): #blocking call
res = []
while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())
return res

#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
for agent in agents:
print(agent.get_results())

我的问题是,如何使用多处理让这段代码工作,或者是否有更好、更被接受的方法让这种模式工作?这是更大框架的一小部分,因此我希望它尽可能面向对象友好。

编辑:这是在 python 2.7 中。

最佳答案

您可以使用普通的multiprocessing.Queue 来做到这一点。您只需要调整 Agent 类,使其不会在 Agent 类本身被 pickle 时尝试 pickle Queue 实例。这是必需的,因为当您对要发送给 Worker 的实例方法进行 pickle 时,您必须对 Agent 实例本身进行 pickle。不过,这样做很容易:

class Agent(object): # Agent is now a new-style class
def __init__(self):
self.cmd_queue = Queue()
self.res_queue = Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()

def __getstate__(self):
""" This is called to pickle the instance """
self_dict = self.__dict__.copy()
del self_dict['cmd_queue']
del self_dict['res_queue']
del self_dict['worker']
return self_dict

def __setstate__(self, self_dict):
""" This is called to unpickle the instance. """
self.__dict__ = self_dict

... # The rest is the same.

请注意,此代码中还有其他一些逻辑问题导致其无法正常运行; get_results 并没有真正按照您的预期去做,因为这很容易受到竞争条件的影响:

    while not self.cmd_queue.empty():#wait for Worker to finish
sleep(.5)
while not self.res_queue.empty():
res.append(self.res_queue.get())

cmd_queue 可能(并且确实如此,使用您的示例代码)在您实际传递给它的函数在 Worker 中运行完成之前最终为空,这将意味着当您从 res_queue 中提取所有内容时,您的某些结果将会丢失。您可以使用 JoinableQueue 修复该问题,这允许工作人员在完成时实际发出信号。

您还应该向工作进程发送一个哨兵,以便它们正确关闭,并且它们的所有结果都从 res_queue 中刷新并正确发送回父进程。我还发现我需要向 res_queue 添加一个哨兵,否则有时 res_queue 会在从 child 写入的最后一个结果实际被刷新之前在父级中显示为空穿过管道,这意味着最后的结果会丢失。

这是一个完整的工作示例:

from multiprocessing import Process, Queue, JoinableQueue
import types
from time import sleep
import copy_reg

def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Worker(Process):
def __init__(self, cmd_queue, res_queue):
self.cmd_queue = cmd_queue
self.res_queue = res_queue
Process.__init__(self)

def run(self):
for f, args, kwargs in iter(self.cmd_queue.get,
(None, (), {})): # None is our sentinel
self.res_queue.put( f(*args, **kwargs) )
self.cmd_queue.task_done() # Mark the task as done.
self.res_queue.put(None) # Send this to indicate no more results are coming
self.cmd_queue.task_done() # Mark the task as done

class Agent(object):
def __init__(self):
self.cmd_queue = JoinableQueue()
self.res_queue = Queue()
self.worker = Worker(self.cmd_queue, self.res_queue)
self.worker.start()

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['cmd_queue']
del self_dict['res_queue']
del self_dict['worker']
return self_dict

def __setstate__(self, self_dict):
self.__dict__ = self_dict

def produce(self, f, *args, **kwargs):
self.cmd_queue.put((f, args, kwargs))

def do_some_work(self):
self.produce(self.foo, waka='waka')

def do_some_other_work(self):
self.produce(self.bar, humana='humana')

def send_sentinel(self):
self.produce(None)

def foo(self, **kwargs):
sleep(2)
return('this is a foo')

def bar(self, **kwargs):
sleep(4)
return('this is a bar')

def get_results(self): #blocking call
res = []
self.cmd_queue.join() # This will block until task_done has been called for every put pushed into the queue.
for out in iter(self.res_queue.get, None): # None is our sentinel
res.append(out)
return res

#This is the interface I'm looking for.
if __name__=='__main__':
agents = [Agent() for i in range(50)]
#this should flow quickly as the calls are added to cmd_queues
for agent in agents:
agent.do_some_work()
agent.do_some_other_work()
agent.send_sentinel()
for agent in agents:
print(agent.get_results())

输出:

['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']
['this is a foo', 'this is a bar']

关于对象中的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29631084/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com