- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在循环中创建多个任务,并在 BashOperator
和 SSHOperator
中传递动态生成的 PythonOperator
任务 ID 以进行 XCOM 拉取.
for group_key in range(1,5):
dag = create_dag(group_key)
globals()[dag.dag_id] = dag
for i in range(2):
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task_'+str(i),
postgres_conn_id='pg_airflow_db',
sql= "delete from xcom where dag_id= '" + dag.dag_id + "' ",
dag=dag)
t0_get_next = PythonOperator(
task_id='load_hist_pkg_to_ts_'+ str(i),
provide_context=True,
python_callable=load_hist_pkg_to_ts,
xcom_push=True,
op_kwargs={'pg_conn_id':pg_conn_id,
'pg_conn_schema': pg_conn_schema,
},
dag=dag)
t1_check_file=ShortCircuitOperator(
task_id='check_if_file_exist_'+str(i),
python_callable=_check_files,
provide_context=True,
op_kwargs={'load_hist_pkg_to_ts_cnt':'load_hist_pkg_to_ts_'+str(i),
},
dag=dag,
)
t0_move_file = BashOperator(
task_id='bash_move_file_'+str(i),
bash_command= "{{ ti.xcom_pull(key = 't0_bash_move_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)
t1_copyfile = SSHOperator(
ssh_conn_id='ssh_execute_rempte',
task_id='ssh_file_to_postgres_'+str(i),
xcom_push=True,
command= "{{ ti.xcom_pull(key = 't1_bash_copy' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)
t2_archive_file = BashOperator(
task_id='bash_archive_file_'+str(i),
bash_command= "{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)
delete_xcom_task>>t0_get_next >>t1_check_file>>t0_move_file>>t1_copyfile>>t2_archive_file
最佳答案
已解决 - 必须像下面这样做
bash_command= "{{{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_{}' , dag_id = ti.dag_id)}}}}".format(i),
关于loops - 如何在 Airflow BashOperator 和 SSHOperator 中的 jinja 模板中传递循环和动态任务 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57151376/
我遇到一个问题,BashOperator 没有记录 wget 的所有输出。它只会记录输出的前 1-5 行。 我已经尝试过仅使用 wget 作为 bash 命令: tester = BashOperat
有没有办法将命令行参数传递给 Airflow BashOperator。目前,我有一个 python 脚本,它接受日期参数并执行一些特定的事件,例如清理比给定日期早的特定文件夹。 在只有一个任务的简化
请耐心等待,因为我刚开始使用 Airflow,我想做的是从 BashOperator 任务收集返回码并将其保存到局部变量,然后根据该返回码分支出来到另一个任务。我遇到的问题是弄清楚如何让 BashOp
我在 Centos 7 中使用 Airflow,使用 Python 3.7。 当我通过 BashOperator 运行 Bash 命令时,我遇到了以下问题: [2019-11-13 23:20:08,
我的 dag 构建器模块中有一组任务,该模块使用 Airflow 中全局使用的 Python 运算符。我正在 kubernetes 上使用 docker 部署 Airflow 。 任务失败并显示错误消
有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH
使用 BashOperator 运行 Python 脚本时 Airflow 1.10.9, task_1 = BashOperator( task_id='task_1', bash_
这些天我正在从事一个新的 ETL 项目,我想尝试将 Airflow 作为作业经理。 我和我的同事都是第一次在 Airflow 上工作,我们采用了两种不同的方法:我决定编写 python 函数(类似于
这是我的 dag 文件和 BashOperator 任务: my_dag = { dag_id = 'my_dag', start_date = datetime(year=2017, month=3
我刚开始使用 apache airflow。我正在尝试从 Airflow 运行 test.sh 文件,但它不起作用。 以下是我的代码,文件名为test.py import os from airflo
我正在尝试编写我们的第一个 Airflow DAG,当我尝试使用命令 airflow list_tasks orderwarehouse 列出任务时出现以下错误: Traceback (most re
我似乎对 BashOperator 有疑问。我在 Miniconda 环境(Python 3.6)中使用安装在 CentOS 上的 Airflow 1.10,使用的是 Conda Forge 上的软件
升级到 Airflow 2 后,我在一些 DAG 中遇到了这个错误: ImportError: cannot import name 'BashOperator' from 'airflow.oper
我不知道如何解决这个问题——与 ModuleNotFoundError 相关的其他帖子可以通过重新安装相关包来解决,但很明显这不是问题,因为示例 bash 运算符DAG 运行。那么我的问题与 Airf
我不断从当前正在回填的预定 BashOperator 中收到相同的错误(“落后”了一个多月)。 [2018-06-10 22:06:33,558] {base_task_runner.py:115}
非常喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到错误:“{jobs.py:538} 错误 - DAG 运行因 DAG 陷入僵局:TEST_SCHEDULER_DAG”。 这
我正在尝试在循环中创建多个任务,并在 BashOperator 和 SSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取. for group_k
我正在尝试在循环中创建多个任务,并在 BashOperator 和 SSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取. for group_k
我正在使用 Celery 和 Redis 运行 Airflow,它工作得很好,但我在工作端遇到了问题。我有两个 docker-compose 文件,一个是在服务器上运行的 master,一个是在其他
我是一名优秀的程序员,十分优秀!