gpt4 book ai didi

Airflow - 如何将 xcom 变量传递到 Python 函数中

转载 作者:行者123 更新时间:2023-12-02 04:46:18 25 4
gpt4 key购买 nike

我需要引用 BashOperator 返回的变量。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} 打印为字符串而不是值。

如果我使用bash_command,该值将正确打印。

get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)

submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)

task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
python_callable=obj.func_archive_s3_file,
params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file

最佳答案

对问题和答案都投了赞成票,但我认为对于那些只想在 DAG 中的 PythonOperator 任务之间传递小数据对象的用户来说,这可以更清楚一些。引用这个问题和this XCom example让我找到了以下解决方案。 super 简单:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)

def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls

push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)

def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)

pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)

push_task >> pull_task

我不确定为什么会这样,但确实如此。向社区提出的几个问题:

  • 这里的 ti 发生了什么?它是如何内置到 **kwargs 中的?
  • 这两个函数都需要 provide_context=True 吗?

非常欢迎任何使这个答案更清晰的编辑!

关于Airflow - 如何将 xcom 变量传递到 Python 函数中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46059161/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com