gpt4 book ai didi

python - 使用 websockets 正常关闭 uvicorn starlette 应用程序

转载 作者:行者123 更新时间:2023-12-04 04:27:29 24 4
gpt4 key购买 nike

鉴于此示例 Starlette 应用程序具有开放的 websocket 连接,您如何关闭 Starlette 应用程序?我在 uvicorn 上运行。每当我按下 Ctrl+C输出为 Waiting for background tasks to complete.永远挂起。

from starlette.applications import Starlette

app = Starlette()

@app.websocket_route('/ws')
async def ws(websocket):
await websocket.accept()

while True:
# How to interrupt this while loop on the shutdown event?
await asyncio.sleep(0.1)

await websocket.close()

我尝试在关闭事件上切换 bool 变量,但该变量永远不会更新。总是 False .

例如。
app.state.is_shutting_down = False


@app.on_event('shutdown')
async def shutdown():
app.state.is_shutting_down = True


@app.websocket_route('/ws')
async def ws(websocket):
await websocket.accept()

while app.state.is_shutting_down is False:

最佳答案

你的变量没有改变的原因是因为在所有任务都执行后(即你无限循环)执行了“关闭”事件的处理程序。
在 asyncio 事件循环上设置信号处理程序可能不起作用,因为我相信只允许 uvicorn 已经为其自己的关闭过程设置的单个信号处理程序。
相反,您可以对 uvicorn 信号处理程序进行 Monkey Patch 以检测应用程序关闭并在该新函数中设置您的控制变量。

import asyncio
from starlette.applications import Starlette
from uvicorn.main import Server

original_handler = Server.handle_exit

class AppStatus:
should_exit = False

@staticmethod
def handle_exit(*args, **kwargs):
AppStatus.should_exit = True
original_handler(*args, **kwargs)

Server.handle_exit = AppStatus.handle_exit

app = Starlette()

@app.websocket_route('/ws')
async def ws(websocket):
await websocket.accept()

while AppStatus.should_exit is False:

await asyncio.sleep(0.1)

await websocket.close()
print('Exited!')

关于python - 使用 websockets 正常关闭 uvicorn starlette 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58133694/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com