gpt4 book ai didi

python - 使用 anyio.TaskGroup 和 fastapi.StreamingResponse

转载 作者:行者123 更新时间:2023-12-05 05:52:49 32 4
gpt4 key购买 nike

anyiostarlette 的一部分,因此也是 FastAPI 的一部分。我发现使用它很方便 task groups在我的一个 API 服务器之外执行对外部服务的并发请求。

此外,我想在结果准备就绪后立即将结果流出。 fastapi.StreamingResponse可以做到这一点,但我仍然需要能够在返回 StreamingResponse 后保持任务组正常运行,但这听起来与 structured concurrency 的想法背道而驰。 .

使用异步生成器可能看起来像是一个显而易见的解决方案,但是 yield 通常不能在任务组上下文中使用,根据以下内容:https://trio.readthedocs.io/en/stable/reference-core.html#cancel-scopes-and-nurseries

有一个 FastAPI 服务器的例子似乎可以工作,尽管它在返回之前聚合了响应:

import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()


@app.get("/")
async def root():
# What to put below?
result = await main()
return StreamingResponse(iter(result))


async def main():
send_stream, receive_stream = anyio.create_memory_object_stream()

result = []
async with anyio.create_task_group() as tg:
async with send_stream:
for num in range(5):
tg.start_soon(sometask, num, send_stream.clone())

async with receive_stream:
async for entry in receive_stream:
# What to do here???
result.append(entry)

return result


async def sometask(num, send_stream):
await anyio.sleep(1)
async with send_stream:
await send_stream.send(f'number {num}\n')



if __name__ == "__main__":
import uvicorn
# Debug-only configuration
uvicorn.run(app)

所以,问题是,在anyio中是否有类似于@trio_util.trio_async_generator的东西,或者是否可以使用@trio_util.trio_async_generator 直接使用 FastAPI?

也许还有其他解决方案?

最佳答案

import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.get("/")
async def root():
return StreamingResponse(main())


async def main():
send_stream, receive_stream = anyio.create_memory_object_stream()

async with anyio.create_task_group() as tg:
async with send_stream:
for num in range(5):
tg.start_soon(sometask, num, send_stream.clone())

async with receive_stream:
async for entry in receive_stream:
yield entry


async def sometask(num, send_stream):
async with send_stream:
for i in range(1000):
await anyio.sleep(1)
await send_stream.send(f"number {num}\n")


if __name__ == "__main__":
import uvicorn

# Debug-only configuration
uvicorn.run(app)

出乎意料的是,它起作用了。

关于python - 使用 anyio.TaskGroup 和 fastapi.StreamingResponse,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70037576/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com