gpt4 book ai didi

python - Airflow ,标记任务成功或在 dag 运行之前跳过它

转载 作者:行者123 更新时间:2023-11-28 17:06:02 25 4
gpt4 key购买 nike

我们有一个巨大的 DAG,其中有许多小而快的任务和一些大而耗时的任务。

我们只想运行 DAG 的一部分,我们发现最简单的方法是不添加我们不想运行的任务。问题是我们的 DAG 有很多相互依赖关系,所以当我们想跳过一些任务时不破坏 dag 就成了一个真正的挑战。

有没有办法默认为任务添加状态? (对于每次运行),类似于:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

for task in task_list:
task.status = 'success'

最佳答案

如评论中所述,您应该使用 BranchPythonOperator(或 ShortCircuitOperator)来防止执行耗时的任务。如果您需要运行这些耗时任务的下游运算符,您可以使用 TriggerRule.ALL_DONE 运行这些运算符,但请注意,即使上游运算符失败,它也会运行。

您可以使用 Airflow 变量来影响这些 BranchPythonOperators 而无需更新 DAG,例如:

from airflow.models import Variable

def branch_python_operator_callable()
return Variable.get('time_consuming_operator_var')

并使用 branch_python_operator_callable 作为 BranchPythonOperator 的 Python 可调用对象。

关于python - Airflow ,标记任务成功或在 dag 运行之前跳过它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50934266/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com