gpt4 book ai didi

python - 使用 Python、Twisted 和 Flask 的服务器发送事件 : is this a correct approach for sleeping?

转载 作者:太空狗 更新时间:2023-10-30 00:07:25 26 4
gpt4 key购买 nike

我开始研究服务器发送的事件,并对使用我喜欢的工具 Python、Flask 和 Twisted 进行尝试产生了兴趣。我问的是,与 gevent 的 greenlet.sleep 做事方式相比,我的 sleep 方式是否很好,这是我采用的非常简单的代码并“移植”到 Twisted(来自 gevent):

#!/usr/bin/env python

import random
from twisted.web.server import Site
from twisted.web.wsgi import WSGIResource
from twisted.internet import reactor
import time

from flask import Flask, request, Response
app = Flask(__name__)

def event_stream():
count = 0
while True:
count += 1
yield 'data: %c (%d)\n\n' % (random.choice('abcde'), count)
time.sleep(1)


@app.route('/my_event_source')
def sse_request():
return Response(
event_stream(),
mimetype='text/event-stream')


@app.route('/')
def page():
return '''
<!DOCTYPE html>
<html>
<head>
<script type="text/javascript" src="//code.jquery.com/jquery-1.8.0.min.js"></script>
<script type="text/javascript">
$(document).ready(
function() {
sse = new EventSource('/my_event_source');
sse.onmessage = function(message) {
console.log('A message has arrived!');
$('#output').append('<li>'+message.data+'</li>');
}

})
</script>
</head>
<body>
<h2>Demo</h2>
<ul id="output"></ul>
</body>
</html>
'''


if __name__ == '__main__':
resource = WSGIResource(reactor, reactor.getThreadPool(), app)
site = Site(resource)
reactor.listenTCP(8001, site)
reactor.run()

虽然 time.sleep 是一个阻塞函数,但不会阻塞 Twisted react 器,这应该通过多个不同的浏览器可以访问页面并正确接收事件这一事实来证明:需要使用不同的浏览器以防万一,正如 Chromium 所做的那样,具有相同 URI 的多个不同请求将排队,并且由于这是一个流式响应,该浏览器队列将很忙,直到套接字或请求关闭。你的挑战是什么?有什么更好的办法吗?关于 Twisted 和 Flask 的示例代码不多..

最佳答案

您的示例仅将扭曲用作 wsgi container .与任何其他基于线程的 wsgi 容器一样,它允许您使用 time.sleep(1)

在这种情况下,允许 twisted 直接处理 /my_event_source 可能是有益的。这是来自 Using server sent events 的示例使用扭曲在 Python 中实现:

def cycle(echo):
# Every second, sent a "ping" event.
timestr = datetime.utcnow().isoformat()+"Z"
echo("event: ping\n")
echo('data: ' + json.dumps(dict(time=timestr)))
echo("\n\n")

# Send a simple message at random intervals.
if random.random() < 0.1:
echo("data: This is a message at time {}\n\n".format(timestr))

class SSEResource(resource.Resource):
def render_GET(self, request):
request.setHeader("Content-Type", "text/event-stream")
lc = task.LoopingCall(cycle, request.write)
lc.start(1) # repeat every second
request.notifyFinish().addBoth(lambda _: lc.stop())
return server.NOT_DONE_YET

客户端 static/index.html 来自 the same source :

<!doctype html>
<title>Using server-sent events</title>
<ol id="eventlist">nothing sent yet.</ol>
<script>
if (!!window.EventSource) {
var eventList = document.getElementById("eventlist");
var source = new EventSource('/my_event_source');
source.onmessage = function(e) {
var newElement = document.createElement("li");

newElement.innerHTML = "message: " + e.data;
eventList.appendChild(newElement);
}
source.addEventListener("ping", function(e) {
var newElement = document.createElement("li");

var obj = JSON.parse(e.data);
newElement.innerHTML = "ping at " + obj.time;
eventList.appendChild(newElement);
}, false);
source.onerror = function(e) {
alert("EventSource failed.");
source.close();
};
}
</script>

您可以将它与您的 wsgi 应用程序结合起来:

app = Flask(__name__)
@app.route('/')
def index():
return redirect(url_for('static', filename='index.html'))

if __name__ == "__main__":
root = resource.Resource()
root.putChild('', wsgi.WSGIResource(reactor, reactor.getThreadPool(), app))
root.putChild('static', static.File("./static"))
root.putChild('my_event_source', SSEResource())

reactor.listenTCP(8001, server.Site(root))
reactor.run()

WSGIResource 需要处理所有 url,因此需要重写路由代码以支持多个 flask 处理程序。

关于python - 使用 Python、Twisted 和 Flask 的服务器发送事件 : is this a correct approach for sleeping?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20380066/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com