gpt4 book ai didi

Airflow 获取重试次数

转载 作者:行者123 更新时间:2023-12-02 12:09:17 25 4
gpt4 key购买 nike

在我的 Airflow DAG 中,我有一个任务需要知道它是第一次运行还是重试运行。如果是重试尝试,我需要调整任务中的逻辑。

我对如何存储任务的重试次数有一些想法,但我不确定它们是否合法,或者是否有更简单的内置方法可以在任务中获取此信息。

  • 我想知道是否可以在每次任务运行时附加的 dag 中包含一个整数变量。然后,如果任务重新运行,我可以检查变量的值以查看它是否大于 1,因此将重试运行。但我不确定可变全局变量在 Airflow 中是否以这种方式工作,因为可以有多个工作人员执行不同的任务(不过我不确定这一点)。

  • 将其写入 XCOM 变量中吗?

最佳答案

重试次数可从任务实例中获得,可通过宏 {{ task_instance }} 获得。 https://airflow.apache.org/code.html#default-variables

如果您使用的是 python 运算符,只需将 provide_context=True, 添加到您的运算符 kwargs,然后在可调用中执行 kwargs['task_instance'].try_number

否则你可以这样做:

t = BashOperator(
task_id='try_number_test',
bash_command='echo "{{ task_instance.try_number }}"',
dag=dag)

编辑:

当任务实例被清除时,会将max_retry次数设置为当前的try_number + 重试值。所以你可以这样做:

ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number

Airflow 在运行时会将 try_number 增加 1,因此我想您在从配置的重试值中减去 max_tries 时需要 + 1。但我没有测试来确认

关于 Airflow 获取重试次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51756289/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com