- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
现在,我使用这样的变量创建了多个任务,它工作正常。
with DAG(....) as dag:
body = Variable.get("config_table", deserialize_json=True)
for i in range(len(body.keys())):
simple_task = Operator(
task_id = 'task_' + str(i),
.....
但出于某种原因我需要使用 XCOM 值而不是使用变量。是否可以动态创建具有 XCOM 拉取值的任务?
我尝试像这样设置值,但它不起作用
body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"
最佳答案
可以从以前任务生成的 XComs
动态创建任务,关于这个主题有更广泛的讨论,例如在这个 question 中.建议的方法之一遵循此结构,这是我制作的一个工作示例:
示例文件.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
XCom
。
def _process_obtained_data(ti):
list_of_cities = ti.xcom_pull(task_ids='get_data')
Variable.set(key='list_of_cities',
value=list_of_cities['cities'], serialize_json=True)
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
# push to XCom using return
return data
with DAG('dynamic_tasks_example', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
get_data = PythonOperator(
task_id='get_data',
python_callable=_read_file)
XCom
的 pull 中提取,并使用稍后将用于迭代的数据设置一个 Variable
。 preparation_task = PythonOperator(
task_id='preparation_task',
python_callable=_process_obtained_data)
*当然,如果您愿意,可以将两个任务合并为一个。我不想这样做,因为通常情况下,我会使用获取的数据的一个子集来创建 Variable
。
Variable
中读取,然后对其进行迭代。定义 default_var
是关键。 end = DummyOperator(
task_id='end',
trigger_rule='none_failed')
# Top-level code within DAG block
iterable_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
task_id
唯一。 TaskGroup
是可选的,可帮助您对 UI 进行排序。
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
if iterable_list:
for index, city in enumerate(iterable_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Hello'}
)
say_goodbye = PythonOperator(
task_id=f'say_goodbye_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
)
# TaskGroup level dependencies
say_hello >> say_goodbye
# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end
DAG 图 View :
导入:
import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
注意事项:
DAG
的同时 dag_run,它们都将使用相同的变量,因此您可能需要通过区分它们的名称使其“唯一”。Variable
时设置默认值,否则,Scheduler
可能无法处理第一次执行。祝你好运!
编辑:
Variable.get()
方法的调用是top-level code ,因此调度程序每 30 秒读取一次(min_file_process_interval
设置的默认值)。这意味着每次都会连接到元数据数据库。编辑:
iterable_list
情况。关于airflow - 使用 XCOM 值在 Airflow 中创建动态工作流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66820948/
自过去 6 个月以来,我一直在使用 Airlfow。我很高兴在 Airflow 中定义工作流程。 我有以下场景,我无法获得 xcom 值(以黄色突出显示)。 请在以下示例代码中找到代码: 工作流程 d
DockerOperator 有一个参数 xcom_push,设置后会将 Docker 容器的输出推送到 Xcom: t1 = DockerOperator(task_id='run-hello-wo
我有任务失败时的松弛警报,但我也希望有恢复消息。 当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用: c
在我的 Airflow dag 中,我有一个 ecs_operator 任务,然后是 python 运算符(operator)任务。我想使用 Airflow 的 xcom 功能将一些消息从 ECS 任
我有一个 Airflow 管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中。我使用 CloudSqlInstanceImportOperator 导入 CSV 文件
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我已经在 DAG(Bash 和 Docker Operators)中成功创建了动态任务,但是我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。 for i in range(0, ma
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我正在存储一个列表,我想使用列表中的每个元素来动态创建下游任务。 我的用例是我有一个上游运算符(operator)检查文件的 sftp
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从运行的第一个任务中获取数据。这是当前代码: from airflow import DAG from airflow.operators.b
创建了一个图像包含 /airflow/xcom/return.json在所有子目录上使用 chmod +x由于日志显示找不到文件或目录(已尝试 chmod +x) strtpodbefore = Ku
我创建了一个 xcom,我想将结果作为 PostgresOperator 参数获取。我试过了 my_task = PostgresOperator( task_id=‘my_task',
我正在尝试获取 XCOM使用 API 的特定 DAG 的值? 找不到任何方法。 有什么想法吗?! 最佳答案 Airflow 可能从版本 2 开始就公开了 API。 Airflow API xcom 值
主要问题:如果不存在,我正在尝试创建一个 BigQuery 表。 联系方式:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTa
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我需要引用 BashOperator 返回的变量。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ ti.xcom_pull(ta
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我广泛搜索了 Airflow 博客和文档来调试我遇到的问题。 我想要解决的问题 检查 ftp 服务器上是否存在特定文件 如果存在,请将其上传到云端 如果不存在,请向客户发送电子邮件,报告未找到文件 我
我尝试通过 airflow cli test 命令测试 2 个任务` 第一个任务运行,自动将最后一个控制台推送到 xcom,我按预期在 Airflow GUI 中看到了值 some value 当我通
我是一名优秀的程序员,十分优秀!