gpt4 book ai didi

airflow - 保证一些操作符将在同一个 Airflow worker 上执行

转载 作者:行者123 更新时间:2023-12-05 02:18:17 25 4
gpt4 key购买 nike

我有一个 DAG

  1. 从云存储下载一个 csv 文件
  2. 通过 https 将 csv 文件上传到第三方

我正在执行的 airflow 集群默认使用 CeleryExecutor,所以我担心在某些时候当我扩大 worker 数量时,这些任务可能会在不同的 worker 上执行。例如。工作人员 A 进行下载,工作人员 B 尝试上传,但没有找到文件(因为它在工作人员 A 上)

是否有可能以某种方式保证下载和上传操作符都将在同一个 airflow worker 上执行?

最佳答案

将第 1 步(csv 下载)和第 2 步(csv 上传)放入一个 subdag,然后通过带有 executor 的 SubDagOperator 触发它选项设置为 SequentialExecutor - 这将确保第 1 步和第 2 步在同一个 worker 上运行。

这是一个工作的 DAG 文件,说明了该概念(实际操作作为 DummyOperators stub ),以及一些更大进程上下文中的下载/上传步骤:

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.sequential_executor import SequentialExecutor

PARENT_DAG_NAME='subdaggy'
CHILD_DAG_NAME='subby'

def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date
)

task_download = DummyOperator(
task_id = 'task_download_csv',
dag=dag
)

task_upload = DummyOperator(
task_id = 'task_upload_csv',
dag=dag
)

task_download >> task_upload

return dag
main_dag = DAG(
PARENT_DAG_NAME,
schedule_interval=None,
start_date=datetime(2017,1,1)
)

main_task_1 = DummyOperator(
task_id = 'main_1',
dag = main_dag
)

main_task_2 = SubDagOperator(
task_id = CHILD_DAG_NAME,
subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
executor=SequentialExecutor(),
dag=main_dag
)

main_task_3 = DummyOperator(
task_id = 'main_3',
dag = main_dag
)

main_task_1 >> main_task_2 >> main_task_3

关于airflow - 保证一些操作符将在同一个 Airflow worker 上执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45842564/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com