gpt4 book ai didi

airflow - dag 中 Airflow 任务的状态

转载 作者:太空狗 更新时间:2023-10-29 21:21:47 24 4
gpt4 key购买 nike

我需要任务的状态,比如它是在运行还是正在重试或在同一个 dag 中失败。所以我尝试使用下面的代码获取它,尽管我没有输出...

Auto = PythonOperator(
task_id='test_sleep',
python_callable=execute_on_emr,
op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
dag=dag)

logger.info(Auto)

目的是在 Airflow 上的特定任务完成后终止某些正在运行的任务。

问题是我如何获取任务的状态,比如它是处于运行状态还是失败或成功

最佳答案

我正在做类似的事情。如果另一项任务的前 10 次运行成功,我需要检查一项任务。taky2 让我走上了正确的道路。这实际上相当简单:

from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()

因为我想在 dag 中检查,所以不需要指定 dag。我只是创建了一个函数来循环过去的 n_days 并检查状态。

def check_status(**kwargs):
last_n_days = 10
for n in range(0,last_n_days):
date = kwargs['execution_date']- timedelta(n)
ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before')
state = ti.current_state()
if state != 'success':
raise ValueError('Not all previous tasks successfully completed.')

当您调用该函数时,请务必设置 provide_context。

check_success_task = PythonOperator(
task_id='check_success_days_before',
python_callable= check_status,
provide_context=True,
dag=dag
)

更新:当你想调用另一个 dag 的任务时,你需要这样调用它:

from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance

dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[*my_dag_id*]
my_task = check_dag.get_task(*my_task_id*)
ti = TaskInstance(my_task, date)

显然现在还有一个 api 调用在做同样的事情:

from airflow.api.common.experimental.get_task_instance import get_task_instance
ti = get_task_instance(*my_dag_id*, *my_task_id*, date)

关于airflow - dag 中 Airflow 任务的状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43732642/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com