gpt4 book ai didi

airflow - 使用 XCOM 值在 Airflow 中创建动态工作流

转载 作者:行者123 更新时间:2023-12-01 23:28:14 41 4
gpt4 key购买 nike

现在,我使用这样的变量创建了多个任务,它工作正常。

with DAG(....) as dag:
body = Variable.get("config_table", deserialize_json=True)
for i in range(len(body.keys())):
simple_task = Operator(
task_id = 'task_' + str(i),
.....

但出于某种原因我需要使用 XCOM 值而不是使用变量。是否可以动态创建具有 XCOM 拉取值的任务?

我尝试像这样设置值,但它不起作用

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

最佳答案

可以从以前任务生成的 XComs 动态创建任务,关于这个主题有更广泛的讨论,例如在这个 question 中.建议的方法之一遵循此结构,这是我制作的一个工作示例:

示例文件.json:

{
"cities": [ "London", "Paris", "BA", "NY" ]
}
  • 从 API、文件或任何来源获取数据。将其推送为 XCom

def _process_obtained_data(ti):
list_of_cities = ti.xcom_pull(task_ids='get_data')
Variable.set(key='list_of_cities',
value=list_of_cities['cities'], serialize_json=True)

def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
# push to XCom using return
return data


with DAG('dynamic_tasks_example', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:

get_data = PythonOperator(
task_id='get_data',
python_callable=_read_file)
  • 添加第二个任务,该任务将从 XCom 的 pull 中提取,并使用稍后将用于迭代的数据设置一个 Variable
    preparation_task = PythonOperator(
task_id='preparation_task',
python_callable=_process_obtained_data)

*当然,如果您愿意,可以将两个任务合并为一个。我不想这样做,因为通常情况下,我会使用获取的数据的一个子集来创建 Variable

  • 从该 Variable 中读取,然后对其进行迭代。定义 default_var关键
    end = DummyOperator(
task_id='end',
trigger_rule='none_failed')

# Top-level code within DAG block
iterable_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
  • 在循环中声明动态任务及其依赖关系。使 task_id 唯一。 TaskGroup 是可选的,可帮助您对 UI 进行排序。

with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
if iterable_list:
for index, city in enumerate(iterable_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Hello'}
)
say_goodbye = PythonOperator(
task_id=f'say_goodbye_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
)

# TaskGroup level dependencies
say_hello >> say_goodbye

# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end

DAG 图 View :

DAG in the UI

导入:

import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

注意事项:

  • 如果您有同一个 DAG 的同时 dag_run,它们都将使用相同的变量,因此您可能需要通过区分它们的名称使其“唯一”。
  • 您必须在读取Variable 时设置默认值,否则,Scheduler 可能无法处理第一次执行。
  • Airflow 图形 View UI 可能不会立即刷新更改。尤其是在从创建动态任务生成的可迭代对象中添加或删除项目后的第一次运行中。
  • 如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(此 article 中的示例)。

祝你好运!

编辑:

另一个需要考虑的重点:

  • 使用这种方法,对Variable.get() 方法的调用是top-level code ,因此调度程序每 30 秒读取一次(min_file_process_interval 设置的默认值)。这意味着每次都会连接到元数据数据库。

编辑:

  • 添加了 if 子句来处理空的 iterable_list 情况。

关于airflow - 使用 XCOM 值在 Airflow 中创建动态工作流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66820948/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com