gpt4 book ai didi

python - Flask + RabbitMQ + SocketIO - 转发消息

转载 作者:IT老高 更新时间:2023-10-28 20:56:05 24 4
gpt4 key购买 nike

我在通过 SocketIO 从 RabbitMQ 向用户发送消息时遇到问题。

我有 SocketIO integration 的 Flask 应用程序.当前的用户流看起来像 this

问题是我无法设置通过 SocketIO 将消息转发到浏览器的 RabbitMQ 监听器。每次我得到不同的错误。主要是连接已关闭,或者我在应用程序上下文之外工作。

我尝试了很多方法,这是我的最后一个。

# callback 
def mq_listen(uid):
rabbit = RabbitMQ()
def cb(ch, method, properties, body, mq=rabbit):
to_return = [0] # mutable
message = Message.load(body)
to_return[0] = message.get_message()

emit('report_part', {"data": to_return[0]})

rabbit.listen('results', callback=cb, id=uid)

# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):

thread = threading.Thread(target=mq_listen, args=(uid,))
thread.start()

return render_template("property/report_result.html", socket_id=uid)

rabbit.listen 方法的抽象如下:

def listen(self, queue_name, callback=None, id=None):
if callback is not None:
callback_function = callback
else:
callback_function = self.__callback
if id is None:
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_qos(prefetch_count=1)
self.consumer_tag = self.channel.basic_consume(callback_function, queue=queue_name)
self.channel.start_consuming()
else:
self.channel.exchange_declare(exchange=queue_name, type='direct')
result = self.channel.queue_declare(exclusive=True)
exchange_name = result.method.queue
self.channel.queue_bind(exchange=queue_name, queue=exchange_name, routing_key=id)
self.channel.basic_consume(callback_function, queue=exchange_name, no_ack=True)
self.channel.start_consuming()

导致

RuntimeError: working outside of request context

如果有任何提示或使用示例,我将很高兴。

非常感谢

最佳答案

我遇到了类似的问题,归根结底是因为当您发出请求时,flask 会将请求上下文传递给客户端。但解决方案不是添加 app.app_context()。那是 hackey 并且肯定会出现错误,因为您不是 native 发送请求上下文。

我的解决方案是创建一个重定向,以便像这样维护请求上下文:

def sendToRedisFeed(eventPerson, type):
eventPerson['type'] = type
requests.get('http://localhost:5012/zmq-redirect', json=eventPerson)

这是我的重定向函数,所以每当有一个事件我想推送到我的 PubSub 时,它都会通过这个函数,然后推送到那个 localhost 端点。

from flask_sse import sse

app.register_blueprint(sse, url_prefix='/stream')

@app.route('/zmq-redirect', methods=['GET'])
def send_message():
try:
sse.publish(request.get_json(), type='greeting')
return Response('Sent!', mimetype="text/event-stream")

except Exception as e:
print (e)
pass

现在,每当一个事件被推送到我的/zmq-redirect 端点时,它都会被重定向并通过 SSE 发布。

现在最后,只是为了总结一切,客户端:

var source = new EventSource("/stream");

source.addEventListener(
"greeting",
function(event) {
console.log(event)
}
)

关于python - Flask + RabbitMQ + SocketIO - 转发消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29461028/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com