- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在使用一个主 dag (main_dag),其中包含多个子标签,每个子标签都有多个任务。我从 subdagA taskA 中推送了一个 xcom,但我在 subdagB taskB 中拉动了那个 xcom。由于 xcom_pull() 中的 dag_id 参数默认为 self.dag_id,因此我无法提取必要的 xcom。我想知道如何做到这一点和/或是否有更好的方法来设置这种情况,这样我就不必处理这个问题了。
我目前在 subdagB 中所做的示例:
def subdagB(parent_dag, child_dag, start_date, schedule_interval):
subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
start = DummyOperator(
task_id='taskA',
dag=subdagB)
tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
t1 = BashOperator(
task_id='taskB',
bash_command=tag_db_template,
xcom_push=True,
dag=subdagB)
end = DummyOperator(
task_id='taskC',
dag=subdagB)
t0.set_upstream(start)
t1.set_upstream(t0)
end.set_upstream(t1)
return subdagB
提前感谢您的帮助!
最佳答案
只要您覆盖
中的dag_id
就应该没问题
[Operator].xcom_pull(dag_id=dag_id, ...)
或[TaskInstance].xcom_pull(dag_id=dag_id, ...)
只要确保
dag_id = "{parent_dag_id}.{child_dag_id}"
如果您能让您的示例更完整,我可以尝试在本地运行它,但我测试了一个(类似的)示例,cross-subdag xcoms 按预期工作。
关于python - 从 sub dag 中拉取 xcom,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43501505/
自过去 6 个月以来,我一直在使用 Airlfow。我很高兴在 Airflow 中定义工作流程。 我有以下场景,我无法获得 xcom 值(以黄色突出显示)。 请在以下示例代码中找到代码: 工作流程 d
DockerOperator 有一个参数 xcom_push,设置后会将 Docker 容器的输出推送到 Xcom: t1 = DockerOperator(task_id='run-hello-wo
我有任务失败时的松弛警报,但我也希望有恢复消息。 当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用: c
在我的 Airflow dag 中,我有一个 ecs_operator 任务,然后是 python 运算符(operator)任务。我想使用 Airflow 的 xcom 功能将一些消息从 ECS 任
我有一个 Airflow 管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中。我使用 CloudSqlInstanceImportOperator 导入 CSV 文件
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我已经在 DAG(Bash 和 Docker Operators)中成功创建了动态任务,但是我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。 for i in range(0, ma
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我正在存储一个列表,我想使用列表中的每个元素来动态创建下游任务。 我的用例是我有一个上游运算符(operator)检查文件的 sftp
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从运行的第一个任务中获取数据。这是当前代码: from airflow import DAG from airflow.operators.b
创建了一个图像包含 /airflow/xcom/return.json在所有子目录上使用 chmod +x由于日志显示找不到文件或目录(已尝试 chmod +x) strtpodbefore = Ku
我创建了一个 xcom,我想将结果作为 PostgresOperator 参数获取。我试过了 my_task = PostgresOperator( task_id=‘my_task',
我正在尝试获取 XCOM使用 API 的特定 DAG 的值? 找不到任何方法。 有什么想法吗?! 最佳答案 Airflow 可能从版本 2 开始就公开了 API。 Airflow API xcom 值
主要问题:如果不存在,我正在尝试创建一个 BigQuery 表。 联系方式:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTa
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我需要引用 BashOperator 返回的变量。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ ti.xcom_pull(ta
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我广泛搜索了 Airflow 博客和文档来调试我遇到的问题。 我想要解决的问题 检查 ftp 服务器上是否存在特定文件 如果存在,请将其上传到云端 如果不存在,请向客户发送电子邮件,报告未找到文件 我
我尝试通过 airflow cli test 命令测试 2 个任务` 第一个任务运行,自动将最后一个控制台推送到 xcom,我按预期在 Airflow GUI 中看到了值 some value 当我通
我是一名优秀的程序员,十分优秀!