As per FastAPI's documentation:
根据FastAPI的文档:
When you declare a path operation function with normal def
instead
of async def
, it is run in an external threadpool that is then
await
ed, instead of being called directly (as it would block the
server).
also, as described here:
另外,如下所述:
If you are using a third party library that communicates with
something (a database, an API, the file system, etc.) and doesn't have
support for using await
, (this is currently the case for most
database libraries), then declare your path operation functions as
normally, with just def
.
If your application (somehow) doesn't have to communicate with
anything else and wait for it to respond, use async def
.
If you just don't know, use normal def
.
Note: You can mix def
and async def
in your path operation functions as much as you need and define each one using the best
option for you. FastAPI will do the right thing with them.
Anyway, in any of the cases above, FastAPI will still work
asynchronously and be extremely fast.
But by following the steps above, it will be able to do some
performance optimizations.
Thus, def
endpoints (in the context of asynchronous programming, a function defined with just def
is called synchronous function), in FastAPI, run in a separate thread from an external threadpool that is then await
ed, and hence, FastAPI will still work asynchronously. In other words, the server will process requests to such endpoints concurrently. Whereas, async def
endpoints run in the event loop
—on the main (single) thread—that is, the server will also process requests to such endpoints concurrently/asynchronously, as long as there is an await
call to non-blocking I/O-bound operations inside such async def
endpoints/routes, such as waiting for (1) data from the client to be sent through the network, (2) contents of a file in the disk to be read, (3) a database operation to finish, etc., (have a look here). If, however, an endpoint defined with async def
does not await
for something inside, in order to give up time for other tasks in the event loop to run (e.g., requests to the same or other endpoints, background tasks, etc.), each request to such an endpoint will have to be completely finished (i.e., exit the endpoint), before returning control back to the event loop and allow other tasks to run. In other words, in such cases, the server will process requests sequentially. Note that the same concept not only applies to FastAPI endpoints, but also to StreamingResponse
's generator function (see StreamingResponse
class implementation), as well as Background Tasks
(see BackgroundTask
class implementation); hence, after reading this answer to the end, you should be able to decide whether you should define a FastAPI endpoint, StreamingResponse
's generator, or background task function with def
or async def
.
因此,在FastAPI中,def端点(在异步编程上下文中,仅用def定义的函数称为同步函数)在独立于外部线程池的线程中运行,然后等待该线程池,因此,FastAPI仍将异步工作。换句话说,服务器将并发处理对这些端点的请求。然而,异步定义端点运行在事件循环中--在主(单个)线程上--也就是说,只要在同步定义端点/路线内部存在对非阻塞I/O受限操作的等待调用,例如等待(1)从客户端通过网络发送数据、(2)要读取磁盘中的文件的内容、(3)要完成的数据库操作等,则服务器还将并发/异步地处理对这样的端点的请求。然而,如果用Async Def定义的端点不等待内部的某些东西,以便为事件循环中的其他任务(例如,对相同或其他端点的请求、后台任务等)腾出时间,则在将控制返回给事件循环并允许其他任务运行之前,对这样的端点的每个请求将必须完全完成(即,退出端点)。换句话说,在这种情况下,服务器将按顺序处理请求。注意,同样的概念不仅适用于FastAPI端点,也适用于StreamingResponse的生成器函数(请参见StreamingResponse类实现),以及后台任务(请参见BackatherTask类实现);因此,在从头到尾阅读此答案之后,您应该能够决定是否应该使用def或async def定义FastAPI端点、StreamingResponse的生成器或后台任务函数。
The keyword await
(which works only within an async def
function) passes function control back to the event loop
. In other words, it suspends the execution of the surrounding coroutine (i.e., a coroutine object is the result of calling an async def
function), and tells the event loop
to let something else run, until that await
ed task completes. Note that just because you may define a custom function with async def
and then await
it inside your async def
endpoint, it doesn't mean that your code will work asynchronously, if that custom function contains, for example, calls to time.sleep()
, CPU-bound tasks, non-async I/O libraries, or any other blocking call that is incompatible with asynchronous Python code. In FastAPI, for example, when using the async
methods of UploadFile
, such as await file.read()
and await file.write()
, FastAPI/Starlette, behind the scenes, actually runs such methods of File objects in an external threadpool (using the async
run_in_threadpool()
function) and await
s it; otherwise, such methods/operations would block the event loop
. You can find out more by having a look at the implementation of the UploadFile
class.
关键字await(仅在JavaScript def函数中起作用)将函数控制传递回事件循环。换句话说,它暂停周围协程的执行(即,协程对象是调用一个协程函数的结果),并告诉事件循环让其他东西运行,直到等待的任务完成。请注意,仅仅因为您可以使用Python def定义一个自定义函数,然后在Python def端点中等待它,但这并不意味着您的代码将异步工作,如果该自定义函数包含对time.sleep()、CPU绑定任务、非Python I/O库或任何其他与异步Python代码不兼容的阻塞调用。例如,在FastAPI中,当使用Await/File的Await/C方法时,例如await file.read()和await file.write(),FastAPI/Starlette在幕后实际上在外部线程池中运行File对象的此类方法(使用Await/C run_in_threadpool()函数)并等待它;否则,此类方法/操作将阻塞事件循环。您可以通过看一下EscheradFile类的实现来了解更多信息。
Note that async
does not mean parallel, but concurrently. Asynchronous code with async
and await
is many times summarised as using coroutines. Coroutines are collaborative (or cooperatively multitasked), meaning that "at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended" (see here and here for more info on coroutines). As described in this article:
请注意,异步并不意味着并行,而是并发。带有Async和AWait的异步代码多次被总结为使用协程。协程是协作性的(或协作性多任务),这意味着“在任何给定的时间,带有协程的程序只运行它的一个协程,而这个正在运行的协程只有在它显式地请求挂起时才会暂停执行”(有关协程的更多信息,请参阅此处和此处)。如本文所述:
Specifically, whenever execution of a currently-running coroutine
reaches an await
expression, the coroutine may be suspended, and
another previously-suspended coroutine may resume execution if what it
was suspended on has since returned a value. Suspension can also
happen when an async for
block requests the next value from an
asynchronous iterator or when an async with
block is entered or
exited, as these operations use await
under the hood.
If, however, a blocking I/O-bound or CPU-bound operation was directly executed/called inside an async def
function/endpoint, it would block the main thread (and hence, the event loop
). Hence, a blocking operation such as time.sleep()
in an async def
endpoint would block the entire server (as in the code example provided in your question). Thus, if your endpoint is not going to make any async
calls, you could declare it with just def
instead, which would be run in an external threadpool that would then be await
ed, as explained earlier (more solutions are given in the following sections). Example:
但是,如果在异步def函数/端点内部直接执行/调用阻塞I/O绑定或CPU绑定操作,它将阻塞主线程(并因此阻塞事件循环)。因此,异步def终结点中的一个阻塞操作(如time.Sept())将阻塞整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不打算进行任何异步调用,您只需使用def来声明它,它将在一个外部线程池中运行,然后等待该外部线程池的运行,正如前面所解释的(更多解决方案将在以下各节中给出)。示例:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
Otherwise, if the functions that you had to execute inside the endpoint are async
functions that you had to await
, you should define your endpoint with async def
. To demonstrate this, the example below uses the asyncio.sleep()
function (from the asyncio
library), which provides a non-blocking sleep operation. The await asyncio.sleep()
method will suspend the execution of the surrounding coroutine (until the sleep operation completes), thus allowing other tasks in the event loop to run. Similar examples are given here and here as well.
否则,如果必须在终结点内执行的函数是必须等待的异步函数,则应使用Async def定义终结点。为了演示这一点,下面的示例使用asyncio.sleep()函数(来自asyncio库),该函数提供非阻塞休眠操作。Await asyncio.sleep()方法将挂起周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。这里和这里也给出了类似的例子。
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Both the endpoints above will print out the specified messages to the screen in the same order as mentioned in your question—if two requests arrived at around the same time—that is:
上面的两个端点将按照您问题中提到的相同顺序将指定的消息打印到屏幕上-如果两个请求几乎同时到达-即:
Hello
Hello
bye
bye
Important Note
When you call your endpoint for the second (third, and so on) time, please remember to do that from a tab that is isolated from the browser's main session; otherwise, succeeding requests (i.e., coming after the first one) will be blocked by the browser (on client side), as the browser will be waiting for response from the server for the previous request before sending the next one. You can confirm that by using print(request.client)
inside the endpoint, where you would see the hostname
and port
number being the same for all incoming requests—if requests were initiated from tabs opened in the same browser window/session)—and hence, those requests would be processed sequentially, because of the browser sending them sequentially in the first place. To solve this, you could either:
当您第二次(第三次,以此类推)调用终结点时,请记住从与浏览器主会话隔离的选项卡中调用;否则,后续请求(即,在第一个请求之后)将被浏览器(在客户端)阻止,因为在发送下一个请求之前,浏览器将等待服务器对前一个请求的响应。您可以通过在端点中使用print(quest.client)来确认这一点,在该端点中,您将看到所有传入请求的主机名和端口号是相同的--如果请求是从在同一浏览器窗口/会话中打开的选项卡中发起的)--因此,这些请求将被顺序处理,因为浏览器首先按顺序发送它们。要解决此问题,您可以执行以下任一操作:
Reload the same tab (as is running), or
Open a new tab in an Incognito Window, or
Use a different browser/client to send the request, or
Use the httpx
library to make asynchronous HTTP requests, along with the awaitable asyncio.gather()
, which allows executing multiple asynchronous operations concurrently and then returns a list of results in the same order the awaitables (tasks) were passed to that function (have a look at this answer for more details).
Example:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
In case you had to call different endpoints that may take different time to process a request, and you would like to print the response out on client side as soon as it is returned from the server—instead of waiting for asyncio.gather()
to gather the results of all tasks and print them out in the same order the tasks were passed to the send()
function—you could replace the send()
function of the example above with the one shown below:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
and Blocking I/O-bound or CPU-bound Operations
If you are required to use async def
(as you might need to await
for coroutines inside your endpoint), but also have some synchronous blocking I/O-bound or CPU-bound operation (long-running computation task) that will block the event loop
(essentially, the entire server) and won't let other requests to go through, for example:
如果需要使用异步def(因为您可能需要等待端点内部的协程),但还需要一些同步阻塞的I/O限制或CPU限制的操作(长期运行的计算任务),这些操作将阻塞事件循环(本质上是整个服务器),并且不会让其他请求通过,例如:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
then:
然后:
You should check whether you could change your endpoint's definition to normal def
instead of async def
. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you could instead declare the type of the endpoint's parameter as bytes
(i.e., file: bytes = File()
) and thus, FastAPI would read the file for you and you would receive the contents as bytes
. Hence, there would be no need to use await file.read()
. Please note that the above approach should work for small files, as the enitre file contents would be stored into memory (see the documentation on File
Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the .read()
method of the SpooledTemporaryFile
directly (which can be accessed through the .file
attribute of the UploadFile
object), so that again you don't have to await
the .read()
method—and as you can now declare your endpoint with normal def
, each request will run in a separate thread (example is given below). For more details on how to upload a File
, as well how Starlette/FastAPI uses SpooledTemporaryFile
behind the scenes, please have a look at this answer and this answer.
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
Use FastAPI's (Starlette's) run_in_threadpool()
function from the concurrency
module—as @tiangolo suggested here—which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool
is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
Alternatively, use asyncio
's loop.run_in_executor()
—after obtaining the running event loop
using asyncio.get_running_loop()
—to run the task, which, in this case, you can await
for it to complete and return the result(s), before moving on to the next line of code. Passing None
as the executor argument, the default executor will be used; that is ThreadPoolExecutor
:
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
or, if you would like to pass keyword arguments instead, you could use a lambda
expression (e.g., lambda: cpu_bound_task(some_arg=contents)
), or, preferably, functools.partial()
, which is specifically recommended in the documentation for loop.run_in_executor()
:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
You could also run your task in a custom ThreadPoolExecutor
. For instance:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
In Python 3.9+, you could also use asyncio.to_thread()
to asynchronously run a synchronous function in a separate thread—which, essentially, uses await loop.run_in_executor(None, func_call)
under the hood, as can been seen in the implementation of asyncio.to_thread()
. The to_thread()
function takes the name of a blocking function to execute, as well as any arguments (*args and/or **kwargs) to the function, and then returns a coroutine that can be await
ed. Example:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
ThreadPoolExecutor
will successfully prevent the event loop
from being blocked, but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound
operations, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor
, as shown below—which, again, you can integrate with asyncio
, in order to await
it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__':
.
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
Use more workers. For example, uvicorn main:app --workers 4
(if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). Note: Each worker "has its own things, variables and memory". This means that global
variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".
If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.
Q :
" ... What's the problem? "
A :
The FastAPI documentation is explicit to say the framework uses in-process tasks ( as inherited from Starlette ).
答:FastAPI文档明确表示该框架使用进程内任务(继承自Starlette)。
That, by itself, means, that all such task compete to receive ( from time to time ) the Python Interpreter GIL-lock - being efficiently a MUTEX-terrorising Global Interpreter Lock, which in effect re-[SERIAL]
-ises any and all amounts of Python Interpreter in-process threads
to work as one-and-only-one-WORKS-while-all-others-stay-waiting...
就其本身而言,这意味着所有这些任务争相接收(时不时地)PythonTM解释器Gil-Lock--它实际上是一个恐吓MUTEX的全局解释器锁,它实际上将任何和所有数量的PythonTM解释器进程中的线程重新定义为one-and-only-one-WORKS-while-all-others-stay-waiting...
On fine-grain scale, you see the result -- if spawning another handler for the second ( manually initiated from a second FireFox-tab ) arriving http-request actually takes longer than a sleep has taken, the result of GIL-lock interleaved ~ 100 [ms]
time-quanta round-robin ( all-wait-one-can-work ~ 100 [ms]
before each next round of GIL-lock release-acquire-roulette takes place ) Python Interpreter internal work does not show more details, you may use more details ( depending on O/S type or version ) from here to see more in-thread LoD, like this inside the async-decorated code being performed :
在细粒度尺度上,您可以看到结果--如果为第二个到达的http请求(从第二个Firefox标签手动启动)生成另一个处理程序所需的时间实际上比休眠所用的时间更长,则会执行Gil-lock交织~100[ms]时间量子循环的结果(在每一轮Gil-lock释放-获取-轮盘赌发生之前,所有等待-等待-一次-工作-工作~100[ms])。在正在执行的异步修饰代码中如下所示:
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
Last, but not least, do not hesitate to read more about all other sharks threads-based code may suffer from ... or even cause ... behind the curtains ...
最后但并非最不重要的一点是,请毫不犹豫地阅读更多关于基于线程的代码可能遭受的其他所有问题的信息。或者甚至引起..。在窗帘后面。
Ad Memorandum
A mixture of GIL-lock, thread-based pools, asynchronous decorators, blocking and event-handling -- a sure mix to uncertainties & HWY2HELL ;o)
Gil-lock、基于线程的池、异步装饰器、阻塞和事件处理的混合体--这肯定是不确定性的混合体
更多回答
In fact this was a trial to check why another call was running serial. The other function calls "UploadFile" and does an "await file.read()" and also runs serial. Moreover, this is run inside an amazon server product, after an api gateway from amazon, and hence all of the requests come from the same IP, since the user connects to amazon, and amazon server calls my api. The problem is that the operation with file is long, and if I have this serialized at the end I have timeouts because of Amazon limitation. I guess I will have to go for the last link you provided!
事实上,这是一次试验,目的是检查为什么另一个电话是连续运行的。另一个函数调用“UploadFile”并执行“aWait file.read()”,还运行Serial。此外,这是在Amazon的API网关之后的Amazon服务器产品中运行的,因此所有请求来自相同的IP,因为用户连接到Amazon,而Amazon服务器调用我的API。问题是对文件的操作很长,如果我在最后序列化了这个文件,我就会因为Amazon的限制而超时。我想我将不得不去你提供的最后一个链接!
After loading the file (an image) I do some hard processing of the image and I upload the image to AWS server (there are S3 handlers). However, there aren't any other explicit awaits in the code.
加载文件(图像)后,我对图像进行一些硬处理,并将图像上传到AWS服务器(有S3处理程序)。但是,代码中没有任何其他显式等待。
computation task means CPU-intensive load. In CPython, threads don't make noticeable boosts for CPU tasks because of GIL which allows only one thread to be active at the moment. Thus, neither the def
route nor run_in_threadpool
will help here.
计算任务意味着CPU密集型负载。在CPython中,线程不会对CPU任务产生明显的提升,因为GIL目前只允许一个线程处于活动状态。因此,def路径和run_in_threadpool在这里都无济于事。
@zhanymkanov Thanks for the comment. I am aware of Python's GIL, and thus, I am planning on extending the above answer soon to provide further solutions using multiprocessing
. Option 1 mentioned above (i.e., increasing the number of workers
) is already one solution to this problem. Regardless, running such tasks in an external threadpool that is then awaited, instead of being called directly - although not offering a true parallelism - is better than nothing, as such tasks would otherwise block the entire server.
@zhanymkanov感谢您的评论。我知道Python的GIL,因此,我计划很快扩展上面的答案,以提供使用多处理的进一步解决方案。上文提到的备选方案1(即增加工人人数)已经是解决这一问题的一种办法。无论如何,在等待的外部线程池中运行这样的任务,而不是被直接调用--尽管没有提供真正的并行性--总比什么都没有要好,否则这样的任务会阻塞整个服务器。
@bravmi You are welcome. The relevant section above has been updated; hopefully, it is now more clear. For more details, please have a look at the links provided above.
@bravmi欢迎你。上面的相关部分已经更新;希望现在更清楚了。有关更多详细信息,请查看上面提供的链接。
我是一名优秀的程序员,十分优秀!