- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要引用 BashOperator
返回的变量。在我的 task_archive_s3_file
中,我需要从 get_s3_file
获取文件名。该任务只是将 {{ ti.xcom_pull(task_ids=submit_file_to_spark) }}
打印为字符串而不是值。
如果我使用bash_command
,该值将正确打印。
get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
python_callable=obj.func_archive_s3_file,
params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
dag=dag)
get_s3_file >> submit_file_to_spark >> task_archive_s3_file
最佳答案
对问题和答案都投了赞成票,但我认为对于那些只想在 DAG 中的 PythonOperator
任务之间传递小数据对象的用户来说,这可以更清楚一些。引用这个问题和this XCom example让我找到了以下解决方案。 super 简单:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
我不确定为什么会这样,但确实如此。向社区提出的几个问题:
ti
发生了什么?它是如何内置到 **kwargs
中的?provide_context=True
吗?非常欢迎任何使这个答案更清晰的编辑!
关于Airflow - 如何将 xcom 变量传递到 Python 函数中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46059161/
自过去 6 个月以来,我一直在使用 Airlfow。我很高兴在 Airflow 中定义工作流程。 我有以下场景,我无法获得 xcom 值(以黄色突出显示)。 请在以下示例代码中找到代码: 工作流程 d
DockerOperator 有一个参数 xcom_push,设置后会将 Docker 容器的输出推送到 Xcom: t1 = DockerOperator(task_id='run-hello-wo
我有任务失败时的松弛警报,但我也希望有恢复消息。 当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用: c
在我的 Airflow dag 中,我有一个 ecs_operator 任务,然后是 python 运算符(operator)任务。我想使用 Airflow 的 xcom 功能将一些消息从 ECS 任
我有一个 Airflow 管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中。我使用 CloudSqlInstanceImportOperator 导入 CSV 文件
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码: with TaskGroup('execute_my_steps') a
我已经在 DAG(Bash 和 Docker Operators)中成功创建了动态任务,但是我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。 for i in range(0, ma
指定多个任务时,task_ids 如何工作? 在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)
我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我正在存储一个列表,我想使用列表中的每个元素来动态创建下游任务。 我的用例是我有一个上游运算符(operator)检查文件的 sftp
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从运行的第一个任务中获取数据。这是当前代码: from airflow import DAG from airflow.operators.b
创建了一个图像包含 /airflow/xcom/return.json在所有子目录上使用 chmod +x由于日志显示找不到文件或目录(已尝试 chmod +x) strtpodbefore = Ku
我创建了一个 xcom,我想将结果作为 PostgresOperator 参数获取。我试过了 my_task = PostgresOperator( task_id=‘my_task',
我正在尝试获取 XCOM使用 API 的特定 DAG 的值? 找不到任何方法。 有什么想法吗?! 最佳答案 Airflow 可能从版本 2 开始就公开了 API。 Airflow API xcom 值
主要问题:如果不存在,我正在尝试创建一个 BigQuery 表。 联系方式:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTa
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我需要引用 BashOperator 返回的变量。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ ti.xcom_pull(ta
现在,我使用这样的变量创建了多个任务,它工作正常。 with DAG(....) as dag: body = Variable.get("config_table", deserialize
我广泛搜索了 Airflow 博客和文档来调试我遇到的问题。 我想要解决的问题 检查 ftp 服务器上是否存在特定文件 如果存在,请将其上传到云端 如果不存在,请向客户发送电子邮件,报告未找到文件 我
我尝试通过 airflow cli test 命令测试 2 个任务` 第一个任务运行,自动将最后一个控制台推送到 xcom,我按预期在 Airflow GUI 中看到了值 some value 当我通
我是一名优秀的程序员,十分优秀!