gpt4 book ai didi

python - 如何在测试 FastAPI 应用程序时触发生命周期启动和关闭?

转载 作者:行者123 更新时间:2023-12-04 16:45:30 26 4
gpt4 key购买 nike

作为 FastAPI 的新手,我正在努力测试比我在教程中看到的稍微困难的代码。我像这样使用 fastapi_cache 模块和 Redis:

from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches

app = FastAPI()

def redis_cache():
return caches.get(CACHE_KEY)

@app.get('/cache')
async def test(
cache: RedisCacheBackend = Depends(redis_cache),
n: int = Query(
...,
gt=-1
)
):
# code that uses redis cache

@app.on_event('startup')
async def on_startup() -> None:
rc = RedisCacheBackend('redis://redis')
caches.set(CACHE_KEY, rc)

@app.on_event('shutdown')
async def on_shutdown() -> None:
await close_caches()

test_main.py 看起来像这样:

import pytest
from httpx import AsyncClient
from .main import app

@pytest.mark.asyncio
async def test_cache():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/cache?n=150")

当我运行 pytest 时,它将 cache 变量设置为 None 并且测试失败。我想我明白为什么代码不起作用。但是我该如何修复它以正确测试我的缓存?

最佳答案

重点是 httpx 没有实现生命周期协议(protocol)并触发 startup 事件处理程序。为此,您需要使用 LifespanManager

安装:pip install asgi_lifespan

代码是这样的:

import pytest
from asgi_lifespan import LifespanManager
from httpx import AsyncClient
from .main import app


@pytest.mark.asyncio
async def test_cache():
async with LifespanManager(app):
async with AsyncClient(app=app, base_url="http://localhost") as ac:
response = await ac.get("/cache")

更多信息在这里:https://github.com/encode/httpx/issues/350

关于python - 如何在测试 FastAPI 应用程序时触发生命周期启动和关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65051581/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com