gpt4 book ai didi

python - atexit:如何手动触发它?

转载 作者:行者123 更新时间:2023-11-30 21:55:06 24 4
gpt4 key购买 nike

当我运行 python 脚本时,我可以退出解释器,并且 atexit 将执行我注册的所有函数。

现在,我正在使用 Airflow ,并希望触发 atexit 任务 on_kill() (即,当我清除或杀死 Airflow 中的 dag 节点时)。

例如,在伪代码中,我需要能够:

class Foo(PythonOperator):
...
def on_kill():
# somehow, trigger atexit tasks without exiting the
# process entirely

atexit 也不是必需的——我可以做点别的事情。主要的一点是,在 python 上下文之外执行的某些内容需要以程序方式终止,理想情况下,通过引用外壳脚本来传递终止函数将是最后的手段(Python 并没有使这个特定的替代方案变得容易) 。

最佳答案

您可以对 atexit 模块进行猴子修补 - 像这样:

import atexit
from queue import LifoQueue


save_register = atexit.register
atexit_queue = LifoQueue()

def my_register(func, *args, **kwargs):
save_register(func, *args, **kwargs)
atexit_queue.put((func, args, kwargs))

atexit.register = my_register


if __name__ == '__main__':

def func1():
print('func1() called')

def func2(arg):
print(f'func2({arg}) called')

def func3(arg, kwarg1=1, kwarg2='foo'):
print(f'func3({arg}, kwarg1={kwarg1}, kwarg2={kwarg2!r}) called')

atexit.register(func1)
atexit.register(func2, 1)
atexit.register(func3, 2, kwarg1=42, kwarg2='bar')

print('Calling queued atexit functions:\n')
while atexit_queue.qsize():
func, args, kwargs = atexit_queue.get()
atexit.unregister(func) # Prevent it from being called again.
func(*args, **kwargs)

输出:

Calling queued atexit functions:

func3(2, kwarg1=42, kwarg2='bar') called
func2(1) called
func1() called

关于python - atexit:如何手动触发它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57633815/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com