gpt4 book ai didi

python - 从 Flask 路由进行 Python 异步调用

转载 作者:行者123 更新时间:2023-12-03 08:56:37 26 4
gpt4 key购买 nike

每次执行 Flask 路由时,我都想执行一个异步函数。为什么是abar函数从未执行?

import asyncio
from flask import Flask

async def abar(a):
print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=loop)
return "OK"

if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
loop.run_forever()

我还尝试将阻塞调用放在一个单独的线程中。但它仍然没有调用 abar功能。
import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
print(a)

app = Flask(__name__)

def start_worker(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
return "OK"

if __name__ == "__main__":
worker.start()
app.run(debug=False, use_reloader=False)

最佳答案

您可以将一些异步功能合并到 Flask 应用程序中,而无需将它们完全转换为 asyncio。

import asyncio
from flask import Flask

async def abar(a):
print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
loop.run_until_complete(abar("abar"))
return "OK"

if __name__ == "__main__":
app.run(debug=False, use_reloader=False)

这将阻止 Flask 响应,直到 async 函数返回,但它仍然允许您做一些聪明的事情。我已经使用这种模式使用 aiohttp 并行执行许多外部请求,然后当它们完成时,我又回到传统的 flask 中进行数据处理和模板渲染。
import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()

def fight(responses):
return "Why can't we all just get along?"

@app.route("/")
def index():
# perform multiple async requests concurrently
responses = loop.run_until_complete(asyncio.gather(
fetch("https://google.com/"),
fetch("https://bing.com/"),
fetch("https://duckduckgo.com"),
fetch("http://www.dogpile.com"),
))

# do something with the results
return fight(responses)

if __name__ == "__main__":
app.run(debug=False, use_reloader=False)

关于python - 从 Flask 路由进行 Python 异步调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47841985/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com