gpt4 book ai didi

python - 如何使用 asyncio 使 python 装饰器函数在 n 秒内重新安排函数?

转载 作者:行者123 更新时间:2023-11-28 18:28:48 24 4
gpt4 key购买 nike

这是我当前的代码:

import asyncio
import serial


def repeat(seconds):
def wrap(func):
def decorated(*args):
loop = asyncio.get_event_loop()
loop.call_at(loop.time() + seconds, decorated, *args)
print('scheduled')
func(*args)
return func
return decorated
return wrap


@repeat(10)
def send_command(ser, text):
try:
if text == 'DATA':
print("\nSending Data Request")
ser.write(b'\n022022')
elif text == 'STAY':
print("\nInmediate Stay ARM")
ser.write(b'\n0640010002044D')
elif text == 'AWAY':
print("\nInmediate Away ARM")
ser.write(b'\n0640010003044E')
elif text == 'DISARM':
print("\nDISARM")
ser.write(b'\n0940010001040700055B')
elif text == 'CHIME':
print("\nChime toggle")
ser.write(b'\n0640010007014F')
except serial.SerialException as e:
raise e


if __name__ == '__main__':

ser = serial.Serial('/dev/ttyUSB0', 9600, parity=serial.PARITY_ODD, write_timeout=0, timeout=0)

loop = asyncio.get_event_loop()
loop.call_at(loop.time() + 5, send_command, ser, 'DATA')
loop.run_forever()

在这个例子中,我试图安排 send_command 函数在 5 秒内运行,之后每 10 秒运行一次。它似乎有效,但我对 loop.call_at(loop.time() + seconds, decorated, *args) 调用有点困惑。任何意见将不胜感激。

最佳答案

你的代码很好(除了你应该在装饰器中返回 func 的结果),我做了一些改进。

使用合成示例更容易查看和测试:

import asyncio
import functools


def repeat(seconds):
def wrap(func):
@functools.wraps(func) # see http://stackoverflow.com/q/308999/1113207
def decorated(*args, **kwargs):
# We should call func that decorated again to
# force decorator's `call_at` code to execute again:
loop = asyncio.get_event_loop()
loop.call_at(
loop.time() + seconds,
functools.partial(decorated, *args, **kwargs) # see http://stackoverflow.com/q/3252228/1113207
)
# We should return result of func's excecution:
return func(*args, **kwargs)
return decorated
return wrap


@repeat(2)
def send_command():
print('send_command')


async def main():
send_command() # call once, rescheduling started

for i in range(10):
print(i)
await asyncio.sleep(1)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

send_command
0
1
send_command
2
3
send_command
4
5
send_command
6
7
send_command
8
9
send_command
[Finished in 10.3s]

关于python - 如何使用 asyncio 使 python 装饰器函数在 n 秒内重新安排函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39090875/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com