gpt4 book ai didi

python - Airflow BashOperator 收集返回码

转载 作者:太空狗 更新时间:2023-10-29 20:57:56 25 4
gpt4 key购买 nike

请耐心等待,因为我刚开始使用 Airflow,我想做的是从 BashOperator 任务收集返回码并将其保存到局部变量,然后根据该返回码分支出来到另一个任务。我遇到的问题是弄清楚如何让 BashOperator 返回一些东西。以下是我的代码段:

dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)

我正在尝试 xcom_push 但老实说不知道它是如何工作的。这是收集结果的正确方法吗?在日志中,最后一行是:Command exited with return code 0

最佳答案

根据 BashOperator doc ,

If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes

知道这一点,您只需要让您的 bash 脚本最后打印错误代码,因此将以下内容附加到您的 bash_command :

<your code> ; echo $?

在你的情况下,它是:

oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)

关于python - Airflow BashOperator 收集返回码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42292345/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com