gpt4 book ai didi

python - 如何使用 rxpython 使长时间运行的程序超时?

转载 作者:太空狗 更新时间:2023-10-29 17:20:55 24 4
gpt4 key购买 nike

假设我有一个看起来像这样的长时间运行的 python 函数?

import random
import time
from rx import Observable
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x

我希望能够将超时设置为 1000ms

所以我正在做类似的事情,创建一个 observable 并通过上面的密集计算映射它。

a = Observable.repeat(1).map(lambda x: intns(x))

现在对于发出的每个值,如果它花费的时间超过 1000 毫秒,我想在到达 1000ms 时立即使用 on_erroron_completed< 结束 observable/

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

上面的语句确实超时了,调用了on_error,但是它继续计算完密集的计算,然后才返回到下一条语句。有更好的方法吗?

最后一条语句打印如下

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

我的想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果。

我想知道intns函数是不是在一个单独的线程上完成的,我猜超时后主线程继续执行,但我还是想停止计算intns函数在一个线程上,或者以某种方式杀死它。

最佳答案

下面是一个可以使用 with timeout() 调用的类:

如果代码下的 block 运行时间超过指定时间,则会引发 TimeoutError

import signal

class timeout:
# Default value is 1 second (1000ms)
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)

# example usage
with timeout() :
# infinite while loop so timeout is reached
while True :
pass

如果我能理解您的功能,那么您的实现应该是这样的:

def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
with timeout() :
time.sleep(y)
print('end')
return x

关于python - 如何使用 rxpython 使长时间运行的程序超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45226218/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com