gpt4 book ai didi

python - Airflow : Passing a dynamic value to Sub DAG operator

转载 作者:太空狗 更新时间:2023-10-30 00:09:24 27 4
gpt4 key购买 nike

我是 Airflow 的新手。
我遇到过一个场景,父 DAG 需要将一些动态数字(比如 n)传递给子 DAG。
SubDAG 将使用此数字动态创建 n 并行任务。

Airflow 文档未涵盖实现此目的的方法。所以我探索了几种方法:

选项 - 1(使用 xcom Pull)

我曾尝试作为 xcom 值传递,但出于某种原因,SubDAG 未解析为传递的值。

父 Dag 文件

def load_dag(**kwargs):
number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
dag_data = json.dumps({
"number_of_runs": number_of_runs
})
return dag_data

# ------------------ Tasks ------------------------------
load_config = PythonOperator(
task_id='load_config',
provide_context=True,
python_callable=load_dag,
dag=dag)


t1 = SubDagOperator(
task_id=CHILD_DAG_NAME,
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
default_args=default_args,
dag=dag,
)

子目录文件

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)

variabe_names = {}

for i in range(num_of_runs):
variabe_names['task' + str(i + 1)] = DummyOperator(
task_id='dummy_task',
dag=dag_subdag,
)

return dag_subdag

选项 - 2

我还尝试将 number_of_runs 作为全局变量传递,但没有成功。

选项 - 3

我们还尝试将此值写入数据文件。但是子 DAG 抛出 File doesn't exist error。这可能是因为我们正在动态生成此文件。

有人可以帮我解决这个问题吗?

最佳答案

我已经使用选项 3 完成了它。关键是如果文件不存在,则返回一个没有任务的有效 dag。因此 load_config 将生成一个文件,其中包含您的任务数量或更多信息(如果需要)。你的 subdag 工厂看起来像:

def subdag(...):
sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
file_path = "/path/to/generated/file"
if os.path.exists(file_path):
data_file = open(file_path)
list_tasks = data_file.readlines()
for task in list_tasks:
DummyOperator(
task_id='task_'+task,
default_args=args,
dag=sdag,
)
return sdag

在 dag 生成时,您将看到一个没有任务的子标签。 dag执行时,load_config完成后,可以看到动态生成的subdag

关于python - Airflow : Passing a dynamic value to Sub DAG operator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44365716/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com