- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
请耐心等待,因为我刚开始使用 Airflow,我想做的是从 BashOperator 任务收集返回码并将其保存到局部变量,然后根据该返回码分支出来到另一个任务。我遇到的问题是弄清楚如何让 BashOperator 返回一些东西。以下是我的代码段:
dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
我正在尝试 xcom_push 但老实说不知道它是如何工作的。这是收集结果的正确方法吗?在日志中,最后一行是:Command exited with return code 0。
最佳答案
根据 BashOperator doc ,
If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes
知道这一点,您只需要让您的 bash 脚本最后打印错误代码,因此将以下内容附加到您的 bash_command
:
<your code> ; echo $?
在你的情况下,它是:
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)
关于python - Airflow BashOperator 收集返回码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42292345/
我遇到一个问题,BashOperator 没有记录 wget 的所有输出。它只会记录输出的前 1-5 行。 我已经尝试过仅使用 wget 作为 bash 命令: tester = BashOperat
有没有办法将命令行参数传递给 Airflow BashOperator。目前,我有一个 python 脚本,它接受日期参数并执行一些特定的事件,例如清理比给定日期早的特定文件夹。 在只有一个任务的简化
请耐心等待,因为我刚开始使用 Airflow,我想做的是从 BashOperator 任务收集返回码并将其保存到局部变量,然后根据该返回码分支出来到另一个任务。我遇到的问题是弄清楚如何让 BashOp
我在 Centos 7 中使用 Airflow,使用 Python 3.7。 当我通过 BashOperator 运行 Bash 命令时,我遇到了以下问题: [2019-11-13 23:20:08,
我的 dag 构建器模块中有一组任务,该模块使用 Airflow 中全局使用的 Python 运算符。我正在 kubernetes 上使用 docker 部署 Airflow 。 任务失败并显示错误消
有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH
使用 BashOperator 运行 Python 脚本时 Airflow 1.10.9, task_1 = BashOperator( task_id='task_1', bash_
这些天我正在从事一个新的 ETL 项目,我想尝试将 Airflow 作为作业经理。 我和我的同事都是第一次在 Airflow 上工作,我们采用了两种不同的方法:我决定编写 python 函数(类似于
这是我的 dag 文件和 BashOperator 任务: my_dag = { dag_id = 'my_dag', start_date = datetime(year=2017, month=3
我刚开始使用 apache airflow。我正在尝试从 Airflow 运行 test.sh 文件,但它不起作用。 以下是我的代码,文件名为test.py import os from airflo
我正在尝试编写我们的第一个 Airflow DAG,当我尝试使用命令 airflow list_tasks orderwarehouse 列出任务时出现以下错误: Traceback (most re
我似乎对 BashOperator 有疑问。我在 Miniconda 环境(Python 3.6)中使用安装在 CentOS 上的 Airflow 1.10,使用的是 Conda Forge 上的软件
升级到 Airflow 2 后,我在一些 DAG 中遇到了这个错误: ImportError: cannot import name 'BashOperator' from 'airflow.oper
我不知道如何解决这个问题——与 ModuleNotFoundError 相关的其他帖子可以通过重新安装相关包来解决,但很明显这不是问题,因为示例 bash 运算符DAG 运行。那么我的问题与 Airf
我不断从当前正在回填的预定 BashOperator 中收到相同的错误(“落后”了一个多月)。 [2018-06-10 22:06:33,558] {base_task_runner.py:115}
非常喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到错误:“{jobs.py:538} 错误 - DAG 运行因 DAG 陷入僵局:TEST_SCHEDULER_DAG”。 这
我正在尝试在循环中创建多个任务,并在 BashOperator 和 SSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取. for group_k
我正在尝试在循环中创建多个任务,并在 BashOperator 和 SSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取. for group_k
我正在使用 Celery 和 Redis 运行 Airflow,它工作得很好,但我在工作端遇到了问题。我有两个 docker-compose 文件,一个是在服务器上运行的 master,一个是在其他
我是一名优秀的程序员,十分优秀!