gpt4 book ai didi

python - 将 ZMQStream 附加到现有的 Tornado ioloop

转载 作者:太空狗 更新时间:2023-10-30 02:32:23 45 4
gpt4 key购买 nike

我有一个应用程序,其中每个 websocket 连接(在 tornado 打开回调中)都会创建一个 zmq.SUB 套接字到现有 zmq.FORWARDER 设备。想法是从 zmq 接收数据作为回调,然后可以通过 websocket 连接中继到前端客户端。

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()

class ZMQPubSub(object):

def __init__(self, callback):
self.callback = callback

def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('tcp://127.0.0.1:5560')
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.callback)

def subscribe(self, channel_id):
self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

class MyWebSocket(WebSocketHandler):

def open(self):
self.pubsub = ZMQPubSub(self.on_data)
self.pubsub.connect()
self.pubsub.subscribe("session_id")
print 'ws opened'

def on_message(self, message):
print message

def on_close(self):
print 'ws closed'

def on_data(self, data):
print data

def main():
application = Application([(r'/channel', MyWebSocket)])
application.listen(10001)
print 'starting ws on port 10001'
ioloop.start()

if __name__ == '__main__':
main()

转发器.py

import zmq

def main():
try:
context = zmq.Context(1)

frontend = context.socket(zmq.SUB)
frontend.bind('tcp://*:5559')
frontend.setsockopt(zmq.SUBSCRIBE, '')

backend = context.socket(zmq.PUB)
backend.bind('tcp://*:5560')

print 'starting zmq forwarder'
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception(e)
finally:
frontend.close()
backend.close()
context.term()

if __name__ == '__main__':
main()

发布.py

import zmq

if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:5559')
socket.send('session_id helloworld')
print 'sent data for channel session_id'

但是,我的 ZMQPubSub 类似乎根本没有接收到任何数据。

我进一步试验并意识到我需要调用 ioloop.IOLoop.instance().start()ZMQPubSub。但是,这只会阻止执行。

我还尝试将 main.ioloop 实例传递给 ZMQStream 构造函数,但也无济于事。

有没有一种方法可以将 ZMQStream 绑定(bind)到现有的 main.ioloop 实例而不阻塞 MyWebSocket.open 中的流?

最佳答案

在您现在完成的示例中,只需将转发器中的 frontend 更改为 PULL 套接字,并将您的发布者套接字更改为 PUSH,它应该会按您的预期运行。

此处相关的套接字选择的一般原则:

  • 当您想将消息发送给准备好接收消息的所有人(可能没有人)时使用 PUB/SUB
  • 当你想将消息发送给一个节点时使用 PUSH/PULL,等待他们准备好

最初看起来您可能只需要 PUB-SUB,但是一旦您开始查看每个套接字对,您就会意识到它们非常不同。 frontend-websocket 连接绝对是 PUB-SUB - 您可能有零对多接收者,并且您只想将消息发送给消息通过时碰巧可用的每个人。但后端不同 - 只有一个接收者,它肯定需要来自发布者的每条消息。

至此,后端应该是 PULL,前端应该是 PUB。你所有的套接字:

PUSH -> [PULL-PUB] -> SUB

publisher.py:socket是PUSH,连接到device.py中的backend

forwarder.py: backendPULLfrontendPUBws.py: SUB 连接并订阅 forwarder.frontend

在您的案例中,导致后台 PUB/SUB 失败的相关行为是慢连接综合症,即 described in The Guide .本质上,订阅者需要有限的时间来告诉发布者那里有订阅,所以如果你在打开 PUB 套接字后立即发送消息,很可能还没有被告知它有任何订​​阅者,所以它只是丢弃消息。

关于python - 将 ZMQStream 附加到现有的 Tornado ioloop,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18486988/

45 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com