gpt4 book ai didi

python - 在 Airflow 中的后续任务中获取先前任务 ID 的名称?

转载 作者:行者123 更新时间:2023-12-05 04:58:21 25 4
gpt4 key购买 nike

<分区>

在 Airflow 中,我正在编写一个包含多个任务的 DAG,到目前为止都是 PythonOperators。从 t1 我想在 xcom 字典中存储一个变量,然后在 t2 的函数中我想访问该变量而不显式调用任务名称(这需要在 t2 函数中对任务名称进行硬编码)。所以我的计划是访问 context['ti'] 并使用具有属性 task_id_get_previous_ti()。这看起来像我想要的,但它绝对不适合我。

我试过这个:

from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

def task1(**context):
return 'TASK 1 RESULT'

def task2(**context):
previous_ti = context['ti']._get_previous_ti()
print("Previous TI: ", previous_ti)
previous_ti_id = previous_ti.task_id
print("Previous task_id: ", previous_ti_id)

# use previous_ti_id to access context['ti'].xcom.pull(previous_ti_id)
return

default_args = {
"owner": "me",
"start_date": days_ago(1)
}

dag = DAG(
dag_id='some_test',
default_args=default_args,
schedule_interval=None)


with dag:

t1 = PythonOperator(
task_id = "task_1_testing",
python_callable=task1,
provide_context=True)

t2 = PythonOperator(
task_id = "task_2_testing",
python_callable=task2,
provide_context=True)

t1 >> t2

但这会产生奇怪的结果:当我第一次测试它时,airflow 已经在运行并且它似乎正在引用先前触发的 dag 运行(?)的任务实例。当我退出 Airflow 并使用这段代码重新启动它时,它给了我错误的爆炸屏幕:AttributeError: 'NoneType' object has no attribute 'dag_id'

我真正想要的是在 t2 函数中创建一个名为 previous_ti_id 的变量,在本例中它会返回等于 task_1_testing .可能吗?

我找到了 this previous question ,但我对 Airflow 的了解还不够,无法确定这是否相关(虽然看起来并不相关)。我将不胜感激。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com