gpt4 book ai didi

Python multiprocessing.Queue 修改对象

转载 作者:太空狗 更新时间:2023-10-30 02:34:16 24 4
gpt4 key购买 nike

我有一个应用程序,它在 Python 中实现了类似于责任链的功能。有一个进程通过 multiprocessing.Queue() 将对象传递给其他进程,然后这些进程对对象执行操作。跟踪传递的对象的最后修改时间也很重要,因此只有在对象被修改时才能采取行动。

我遇到的问题是对象中的 _modified 属性在从队列中提取后似乎随机更改。但是,_mtime 属性始终是正确的。下面的示例将运行并(有意地)随机修改 DummyObject,然后将其放置在每个处理程序进程的队列中。然后每个处理程序将打印他们在对象中收到的 _modified 和 _mtime 值。我希望 _modified 值在 command_func 和处理程序函数中相同,但通常情况并非如此。如果我从 DummyObject 中删除 Object_w_mtime 继承,那么我在发送和接收的对象中看不到任何差异。

我对 python 比较陌生。据我所知,应该发生的事情是每次将一个对象放在队列中时,它被腌制,然后通过管道发送到接收进程,该进程取消该对象。那是对的吗?当对象被 pickled/unpickled 时,有什么方法会导致对象继承困惑吗?

我在 Ubuntu 11.10 上使用 Python 2.7.2 和 2.6.7 以及在 Ubuntu 11.04 上使用 python 2.7.1 对此进行了测试。有时您必须让它运行一分钟左右才能看到行为,因为它看起来是随机的。

在此提前致谢。

import multiprocessing
import time
import traceback
import os
import random

class Object_w_mtime(object):
'''
Parent object that tracks the last time an attribute was modified
'''
def __setattr__(self,a_name,a_value):
if ((a_name not in ('_mtime','_modified')) and
(a_value != getattr(self,a_name,None))
):
object.__setattr__(self, '_modified', True)
object.__setattr__(self, '_mtime', time.time())
object.__setattr__(self, a_name, a_value)
return True
#END def

def reset(self):
self._modified = False
#END class

class DummyObject(Object_w_mtime):
def __init__(self):
self.value = 10

def handler(in_queue = None, handler_id = None):
print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
while True:
try:
obj = in_queue.get(True,61)
print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print traceback.format_exc()
break
return True
#END def

def command_func(next_links = None):
print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
obj = DummyObject()
while True:
try:
# randomly assign a different value to test with a modified and unmodified object
obj.value = random.randint(0,1)
print '**************** obj.value = {0} ***************'.format(obj.value)
print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
for each in next_links:
each.put(obj,False)
except multiprocessing.queues.Empty:
break
except KeyboardInterrupt:
break
except Exception as e:
print e
print traceback.format_exc()
break
obj.reset()
time.sleep(3)
return True
#END def


if __name__ == '__main__':
handler_queues = list()
handler_processes = list()
# Create a queue and process object for each command handler
for handler_id in range(1,4):
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=handler, args=(queue, handler_id))
handler_queues.append(queue)
handler_processes.append(process)

try:
# spawn handler processes
for process in handler_processes:
process.start()
# Start sending commands to handlers
command_func(handler_queues)

# exit on keyboard interrupt
except KeyboardInterrupt:
for process in handler_processes:
process.join()
except Exception:
traceback.print_exc()

最佳答案

简而言之,就是把obj放入队列后修改。

查看http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup第 285 行,put() 只是将对象放入内部队列,如果尚未运行,则启动后台线程来处理来自该队列的对象。因此,您的代码中的 each.put(obj,False)obj.reset() 之间存在竞争。

您应该只使用具有不可变(副本)对象的队列。

关于Python multiprocessing.Queue 修改对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8993204/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com