gpt4 book ai didi

python - 使用SSE实时传递来自其他API的数据

转载 作者:行者123 更新时间:2023-11-30 21:49:42 26 4
gpt4 key购买 nike

我想要进行实时可视化,其中我有一个 Flask 应用程序将 SSE 发送到 HTML,然后 HTML 执行其操作。可视化工作完全正常。我对上证所感到困惑。有一个系统可以在“/data”处对我的应用程序执行 ping 操作以移交数据。然后我想将其转移到事件流中的可见性。

import time, json
from flask import Flask, request, Response, render_template

app =Flask(__name__)

def data_stream(data):
if data:
yield 'data: {}\n\n'.format(json.dumps(data))
else:
yield 'data: {}\n\n'.format(json.dumps({'data': [{'lat':0, 'lg':0}]}))

@app.route('/data', methods =['GET','POST'])
def collect_data():
data = {"data": [request.get_json()]}
data_stream(data)
return 'asd'


@app.route('/my_event_source', methods =['GET', 'POST'])
def sse_request():
return Response(data_stream(None), mimetype='text/event-stream')

@app.route('/')
def page():
return render_template('map_vis.html')

if __name__ == '__main__':
data2 = []
app.debug =True
app.run('0.0.0.0', 8081)

我无法找到将 data_stream 函数传递给 Response 的方法,因为到目前为止它只是简单地调用 data_stream(None) ,即我得到 {'lat':0, 'lg':0 } 作为响应。

最佳答案

您似乎正在使用 SSE,例如长轮询或其他 AJAX 派生技术; SSE 作为流更有用,因此一旦建立 SSE 连接,您需要保持数据发送功能“事件”并让它会在您需要时发送数据。

所以,我会这样做:

import Queue
data_queue = Queue.Queue()

def data_stream():
while True:
data = data_queue.get()
yield 'data: {}\n\n'.format(json.dumps(data))

当通过SSE发送数据时,我只是将数据放入队列中:

def sse_request():
r = Response(data_stream(), mimetype='text/event-stream')
data_queue.put({'data': [{'lat':0, 'lg':0}]})
return r

def collect_data():
data_queue.put({"data": [request.get_json()]})
...

唯一的是这样写会导致while循环阻塞服务器。所以你需要使用一些额外的东西,比如 geventEventlet 。有很多如何将 Flask 与这些结合起来的示例。

关于python - 使用SSE实时传递来自其他API的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33401032/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com