gpt4 book ai didi

python - 回调函数在实例中看不到正确的值

转载 作者:太空宇宙 更新时间:2023-11-03 15:20:07 26 4
gpt4 key购买 nike

我在 Python 中的回调函数和处理程序中遇到了一个奇怪的现象。我使用 ZMQ 来处理通信并使用套接字流。我有基类:

import multiprocessing    
import zmq
from concurrent.futures import ThreadPoolExecutor
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json

# Types of messages
TYPE_A = 'type_a'
TYPE_B = 'type_b'


class ZmqProcess(multiprocessing.Process):
def __init__(self):
super(ZmqProcess, self).__init__()
self.context = None
self.loop = None
self.handle_stream = None

def setup(self):
self.context = zmq.Context()
self.loop = ioloop.IOLoop.instance()

def send(self, msg_type, msg, host, port):
sock = zmq.Context().socket(zmq.PAIR)
sock.connect('tcp://%s:%s' % (host, port))
sock.send_json([msg_type, msg])

def stream(self, sock_type, addr):
sock = self.context.socket(sock_type)
if isinstance(addr, str):
addr = addr.split(':')
host, port = addr if len(addr) == 2 else (addr[0], None)
if port:
sock.bind('tcp://%s:%s' % (host, port))
else:
port = sock.bind_to_random_port('tcp://%s' % host)
stream = zmqstream.ZMQStream(sock, self.loop)
return stream, int(port)

class MessageHandler(object):
def __init__(self, json_load=-1):
self._json_load = json_load
self.pool = ThreadPoolExecutor(max_workers=10)

def __call__(self, msg):
i = self._json_load
msg_type, data = json.loads(msg[i])
msg[i] = data
if msg_type.startswith('_'):
raise AttributeError('%s starts with an "_"' % msg_type)
getattr(self, msg_type)(*msg)

我有一个继承它的类:

import zmq    
import zmq_base

class ZmqServerMeta(zmq_base.ZmqProcess):
def __init__(self, bind_addr, handlers):
super(ZmqServerMeta, self).__init__()
self.bind_addr = bind_addr
self.handlers = handlers

def setup(self):
super(ZmqServerMeta, self).setup()
self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr)
self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop,
self.handlers))

def run(self):
self.setup()
self.loop.start()

def stop(self):
self.loop.stop()

class StreamHandler(zmq_base.MessageHandler):
def __init__(self, handle_stream, stop, handlers):
super(StreamHandler, self).__init__()
self._handle_stream = handle_stream
self._stop = stop
self._handlers = handlers

def type_a(self, data):
if zmq_base.TYPE_A in self._handlers:
if self._handlers[zmq_base.TYPE_A]:
for handle in self._handlers[zmq_base.TYPE_A]:
self.pool.submit(handle, data)
else:
pass
else:
pass

def type_b(self, data):
if zmq_base.TYPE_B in self._handlers:
if self._handlers[zmq_base.TYPE_B]:
for handle in self._handlers[zmq_base.TYPE_B]:
self.pool.submit(handle, data)
else:
pass
else:
pass

def endit(self):
self._stop()

此外,我有一个类想要用作存储。这就是麻烦开始的地方:

import threading
import zmq_server_meta as server
import zmq_base as base


class Storage:
def __init__(self):
self.list = []

self.list_lock = threading.RLock()

self.zmq_server = None
self.host = '127.0.0.1'
self.port = 5432
self.bind_addr = (self.host, self.port)

def setup(self):
handlers = {base.TYPE_A: [self. remove]}
self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr)
self.zmq_server.start()

def add(self, data):
with self.list_lock:
try:
self.list.append(data)
except:
print "Didn't work"

def remove(self, msg):
with self.list_lock:
try:
self.list.remove(msg)
except:
print "Didn't work"

这个想法是该类存储它接收到的一些全局信息。一切都在一个文件中开始进行测试:

import sys
import time
import storage
import zmq_base as base
import zmq_server_meta as server



def printMsg(msg):
print msg

store = storage.Storage()

store.setup()
handlers = {base.TYPE_B: [printMsg]}
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431))
client.start()

message = "Test"

store.add(message)
client.send(base.TYPE_A, message, '127.0.0.1', 5432)

我简化了它以减少困惑。通常不是简单地添加它,而是发送它,然后返回响应。客户端发送的响应应该由正确的回调函数remove() 处理,并且应该从列表中删除一些内容。出现的问题是,remove() 函数看到一个空列表,尽管列表中应该有一个元素。如果我从测试文件中检查,我可以看到添加后的元素,如果我从那里调用remove(),我会看到一个非空列表并可以将其删除。我的问题是,为什么回调会看到一个空列表以及如何确保它确实看到列表中的正确元素?

亲切的问候帕特里克

最佳答案

我认为问题在于 ZmqProcess 类继承自 multiprocessing.Process。多处理不允许在不同进程之间共享对象,除非使用使用值或数组的共享内存映射(如文档中所示:https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes)

如果您想使用自定义对象,可以使用服务器进程/代理对象,可以在文档的同一页面上找到该对象。

例如,您可以在 Storage 类的 init 函数中定义一个管理器,例如:self.manager = Manager()然后你输入 self.list = self.manager.list() 。这应该可以解决问题。

关于python - 回调函数在实例中看不到正确的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43607434/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com