gpt4 book ai didi

python - Python 中的函数属性

转载 作者:行者123 更新时间:2023-11-30 23:14:55 24 4
gpt4 key购买 nike

我问这个问题是 Limit function execution in Python 的延续

我找到了一种无需线程等的方法。只需时不时进行简单检查即可。

这是我的装饰器:

def time_limit(seconds):
def decorator(func):
func.info = threading.local()

def check_timeout():
if hasattr(func.info, 'end_time'):
if time.time() > func.info.end_time:
raise TimeoutException

func.check_timeout = check_timeout

@functools.wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(func.info, 'end_time'):
func.info.end_time = time.time() + seconds
return func(*args, **kwargs)

return wrapper

return decorator

及用法:

@time_limit(60)
def algo():
do_something()
algo.check_timeout()
do_something_else()

它在本地主机上工作正常,但在带有 mod_wsgi 和 django 的 apache 服务器上失败。

  1. 第一个问题。注意到hasattr了吗?我应该添加它,因为有时我会收到错误 '_thread.local' has no attribute end_time
  2. 为什么我需要 threading.local?正如 @Graham Dumpleton 指出的那样,我们不能有一个全局结束时间,因为后续请求将进入并覆盖它。因此,如果第一个请求尚未完成,其 end_time 将重置为为后续请求设置的值。问题是这种方法没有帮助。假设我有以下运行训练。

首次运行 - 在超时发生之前 - 运行完美

第二次运行 - 在超时发生之前 - 运行完美

第三次运行 - 发生超时 - 引发 TimeoutException

无论是否发生,所有后续调用都会引发 TimeoutException

似乎所有后续调用都会查看第三次运行的 end_time 副本,并且由于存在超时,它们也会引发超时

如何本地化每个函数调用的 end_time?谢谢。

编辑:感谢@miki725和@Antti Haapala,我简化了我的函数并使其成为一个简单的类:

class TimeLimit(object):
def __init__(self, timeout=60):
self.timeout = timeout
self.end = None

def check_timeout(self):
if self.end and time.time() > self.end:
raise TimeoutException
else:
self.start()

def start(self):
if not self.end:
self.end = time.time() + self.timeout

但是,将计时器传递给函数对我来说非常不方便,因为algo实际上是非常复杂的递归函数。所以,我做了以下事情:

timer = TimeLimit() # fails. I think because it is global
def algo()
do_stuff()
timer.check_timeout()
do_another_stuff()
sub_algo() # check inside it to
algo()
...

有什么方法可以使timer线程安全。伪私有(private)_timer有什么帮助吗?

最佳答案

问题是您在函数对象本身上添加了 end_time 。由于每个线程都会导入所有 Python 模块,因此实际上您只需将 end_time n 设置为您正在运行的线程数(这似乎是您的情况 2) .

要解决这个问题,您可以始终在每个线程中设置end_time,但是这对我来说似乎并不优雅,因为您对将要执行的内容做出了一些假设。

其他解决方案是使用类。这将允许在类实例中保留状态,因此不会发生此问题。

class ExecuteWithTimeout(object):
def __init__(self, to_execute, timeout):
self.to_execute = to_execute
self.timeout = timeout
self.end = None

def check_timeout(self):
if time.time() > self.end:
raise TimeoutException

def __call__(self, *args, **kwargs):
self.end = time.time() + self.timeout
result = self.to_execute(*args, **kwargs)
self.check_timeout()
return result

def usage():
stuff = ExecuteWithTimeout(do_something, 10)()
do_something_else(stuff)

另一种方法是使用上下文管理器:

@contextmanager
def timeout_limit(timeout):
end = time.time() + self.timeout
yield
if time.time() > end:
raise TimeoutException

def usage():
with timeout_limit(10):
do_stuff()
more_things()

或者更好的是,您可以将两者结合起来!

class TimeLimit(object):
def __init__(self, timeout=60):
self.timeout = timeout
self.end = None

def __enter__(self):
self.end = time.time() + self.timeout
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.check_timeout()

def check_timeout(self):
if self.end and time.time() > self.end:
raise TimeoutException

def algo():
with TimeLimit(2) as timer:
time.sleep(1)
timer.check_timeout()
time.sleep(1)
timer.check_timeout()
<小时/>

更新您的更新:

timer = TimeLimit() # fails. I think because it is global
def algo():
...

使用上面的类对您没有帮助,因为该类将是一个线程级实例,这会让您回到最初的问题。问题在于保持线程级状态,因此将其存储在类中还是作为函数对象属性并不重要。如果内部函数需要,您的函数应该显式地将状态传递给这些函数。您不应依赖使用全局状态来执行此操作:

def algo():
with TimeLimit(2) as timer:
do_stuff(timer)
timer.check_timeout()
do_more_stuff(timer)
timer.check_timeout()

关于python - Python 中的函数属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28518720/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com