gpt4 book ai didi

带有配置/参数 json 的 Airflow DAG 并循环到该参数以生成运算符

转载 作者:行者123 更新时间:2023-12-04 14:12:59 24 4
gpt4 key购买 nike

我有一个手动触发的 dag。它需要一个参数,如:

{"id_list":"3,5,1"}

在 DAG 中,我根据这个整数列表动态创建运算符:

for id in id_list:
task = create_task(id)

我需要根据id_list 的参数值来初始化id_list。我该如何初始化该列表,因为当不在模板化字段中时我无法直接引用该参数?这就是我希望在图形 View 中看到它的方式,其中流程任务基于 id_list 参数。

enter image description here

我见过动态创建任务的例子,但从列表值是硬编码的意义上来说,它们并不是真正的动态。如果有意义,任务是根据硬编码值列表动态创建的。

最佳答案

首先,创建固定数量的任务来执行。此示例使用 PythonOperator。在python_callable中,如果index小于param_list的长度则执行else raise AirflowSkipException

        def execute(index, account_ids):
param_list = account_ids.split(',')
if index < len(param_list):
print(f"execute task index {index}")
else:
raise AirflowSkipException


def create_task(task_id, index):
return PythonOperator(task_id=task_id,
python_callable=execute,
op_kwargs={
"index": index,
"account_ids": "{{ dag_run.conf['account_ids'] }}"}
)

record_size_limit = 5
ACCOUNT_LIST = [None] * record_size_limit

for idx in range(record_size_limit):
task = create_task(f"task_{idx}", idx)
task

触发 DAG 并将其作为参数传递:

enter image description here

图 TableView :

enter image description here

关于带有配置/参数 json 的 Airflow DAG 并循环到该参数以生成运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62885881/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com