gpt4 book ai didi

python - 使用 asyncio 在一个时间间隔内运行命令并在之后终止它

转载 作者:行者123 更新时间:2023-12-04 08:09:14 24 4
gpt4 key购买 nike

我正在通过 os.system() 执行 shell 命令.我计划运行它 1 秒钟,然后在超过时间时终止它。这是我作为测试尝试的。

import os, time, asyncio

async def cmd():
os.system("my.py > my.txt") # this processes longer than 1 second

@asyncio.coroutine
def run():
try:
yield from asyncio.wait_for(cmd(), 1) # this should excute for more than 1 sec, and hence raise TimeoutError
print("A")
except asyncio.TimeoutError:
os.system("killall python")
print("B")

asyncio.run(run())

但是结果总是“A”,txt是my.py写的。
我也试过:
import os, time, asyncio

async def cmd():
os.system("my.py > my.txt") # longer than 1 sec

async def run():
try:
await asyncio.wait_for(cmd(), 1) # should raise TimeoutError
print("A")
except asyncio.TimeoutError:
os.system("killall python")
print("B")

asyncio.run(run())

结果相同的输出。
代码有什么问题?我对 asyncio 很陌生,以前从未使用过它。提前致谢。很可能不是 wait_for的问题不会自动停止类型转换,因为我有一个 killall python在第二部分,实际上,wait_for 函数从不引发超时错误,这就是问题所在。

最佳答案

os.system是一个阻塞操作,所以它不能很好地与 asyncio 一起使用.每当您使用 asyncio ,所有“更长”的操作都需要异步,否则您将失去使用 asyncio 的所有好处。首先。

import asyncio

async def cmd():
try:
proc = await asyncio.create_subprocess_shell(
"sleep 2 && date > my.txt",
shell=True,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL)
await proc.communicate()

except asyncio.CancelledError:
proc.terminate()
print("X")

async def run():
try:
await asyncio.wait_for(cmd(), 1)
print("A")

except asyncio.TimeoutError:
print("B")

asyncio.run(run())
正如@dim 在评论中提到的, asyncio.create_subprocess_exec将异步启动外部命令。 wait_for实际上会引发两个异常: TimeoutError在等待代码中,和 CancelledError在等待的代码中。我们可以使用后者来实现操作被取消,而到 terminatekill我们的子流程(因为这不是自动完成的)。

关于python - 使用 asyncio 在一个时间间隔内运行命令并在之后终止它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66065010/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com