gpt4 book ai didi

loops - 如何在 Airflow BashOperator 和 SSHOperator 中的 jinja 模板中传递循环和动态任务 ID

转载 作者:行者123 更新时间:2023-12-02 03:12:43 27 4
gpt4 key购买 nike

我正在尝试在循环中创建多个任务,并在 BashOperatorSSHOperator 中传递动态生成的 PythonOperator 任务 ID 以进行 XCOM 拉取.

for group_key in range(1,5):
dag = create_dag(group_key)
globals()[dag.dag_id] = dag

for i in range(2):
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task_'+str(i),
postgres_conn_id='pg_airflow_db',
sql= "delete from xcom where dag_id= '" + dag.dag_id + "' ",
dag=dag)

t0_get_next = PythonOperator(
task_id='load_hist_pkg_to_ts_'+ str(i),
provide_context=True,
python_callable=load_hist_pkg_to_ts,
xcom_push=True,
op_kwargs={'pg_conn_id':pg_conn_id,
'pg_conn_schema': pg_conn_schema,
},
dag=dag)

t1_check_file=ShortCircuitOperator(
task_id='check_if_file_exist_'+str(i),
python_callable=_check_files,
provide_context=True,
op_kwargs={'load_hist_pkg_to_ts_cnt':'load_hist_pkg_to_ts_'+str(i),
},
dag=dag,
)

t0_move_file = BashOperator(
task_id='bash_move_file_'+str(i),
bash_command= "{{ ti.xcom_pull(key = 't0_bash_move_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)


t1_copyfile = SSHOperator(
ssh_conn_id='ssh_execute_rempte',
task_id='ssh_file_to_postgres_'+str(i),
xcom_push=True,
command= "{{ ti.xcom_pull(key = 't1_bash_copy' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)

t2_archive_file = BashOperator(
task_id='bash_archive_file_'+str(i),
bash_command= "{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_~i~' , dag_id = ti.dag_id) }}",
dag=dag)

delete_xcom_task>>t0_get_next >>t1_check_file>>t0_move_file>>t1_copyfile>>t2_archive_file

最佳答案

已解决 - 必须像下面这样做

bash_command= "{{{{ ti.xcom_pull(key = 't2_bash_remove_file' , task_ids = 'load_hist_pkg_to_ts_{}' , dag_id = ti.dag_id)}}}}".format(i),

关于loops - 如何在 Airflow BashOperator 和 SSHOperator 中的 jinja 模板中传递循环和动态任务 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57151376/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com