gpt4 book ai didi

python - 使用 asyncio 在 python 中每 n 秒运行一个函数

转载 作者:行者123 更新时间:2023-12-05 01:06:02 25 4
gpt4 key购买 nike

我有一个应用程序已经无限运行,异步事件循环永远运行,而且我需要每 10 秒运行一次特定函数。

def do_something():
pass

a = asyncio.get_event_loop()
a.run_forever()

我想每 10 秒调用一次函数 do_something。如何在不使用 while 循环替换 asynctio 事件循环的情况下实现这一点?

编辑:我可以用下面的代码实现这一点

def do_something():
pass
while True:
time.sleep(10)
do_something()

但我不想使用 while 循环在我的应用程序中无限运行,而是我想使用 asyncio run_forever()。那么如何使用 asyncio 每 10 秒调用一次相同的函数呢?有没有类似的调度器不会阻止我正在进行的工作?

最佳答案

asyncio 不附带内置调度程序,但构建自己的调度程序很容易。只需将 while 循环与 asyncio.sleep 结合起来,每隔几秒运行一次代码。

async def every(__seconds: float, func, *args, **kwargs):
while True:
func(*args, **kwargs)
await asyncio.sleep(__seconds)

a = asyncio.get_event_loop()
a.create_task(every(1, print, "Hello World"))
...
a.run_forever()

请注意,如果 func 本身是协程或长时间运行的子例程,则设计必须略有不同。在前一种情况下使用 await func(...) 在后一种情况下使用 asyncio's thread capabilities .

关于python - 使用 asyncio 在 python 中每 n 秒运行一个函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69939800/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com