gpt4 book ai didi

airflow - 从应在 Airflow 中按顺序运行的函数返回任务列表

转载 作者:行者123 更新时间:2023-12-02 18:59:06 25 4
gpt4 key购买 nike

我想从一个函数返回 2 个或更多任务,这些任务应该在它们插入依赖项的位置按顺序运行,请参见下文。

t1 = PythonOperator()

def generate_tasks():
t2 = PythonOperator()
t3 = PythonOperator()
return magic(t2, t3) # magic needed here (preferably)

t1 >> generate_tasks() # otherwise here
# desired result: t1 >> t2 >> t3

这可行吗?据我了解,Airflow 2.0 似乎通过 TaskGroup 实现了这一目标,但我们使用的是 Google 的 Composer,2.0 暂时不会提供。

我发现的最佳解决方法:

t1 = PythonOperator()

def generate_tasks():
t2 = PythonOperator()
t3 = PythonOperator()
return [t2, t3]

tasks = generate_tasks()
t1 >> tasks[0] >> tasks[1]

但我真的希望将其抽象化,因为它或多或少违背了从单个函数返回多个运算符的目的。我们希望它是最终用户所知的单个单元,即使它可以由 2 个或更多任务组成。

如何使用 Airflow 2.0 中的任务组来完成此操作:

class Encryptor:
def encrypt_and_archive(self):
with TaskGroup("archive_and_encrypt") as section_1:
encrypt = DummyOperator(task_id="encrypt")
archive = BashOperator(task_id="archive", bash_command='echo 1')
encrypt >> archive
return section_1

with DAG(dag_id="example_return_task_group", start_date=days_ago(2), tags=["example"]) as dag:
start = DummyOperator(task_id="start")
encrypt_and_archive = Encryptor().encrypt_and_archive()
end = DummyOperator(task_id='end')

# 👇 single variable, containing two tasks
start >> encrypt_and_archive >> end

这将创建以下图表:

TaskGroup in a graph

2.0 之前是否可以远程执行类似的操作?

最佳答案

你没有解释magic(t2, t3)是什么。TaskGroup 是严格的 UI 功能,它不会影响 DAG 逻辑。根据您的描述,您似乎正在寻找特定的逻辑(否则什么是magic?)。

我相信这就是您所追求的:

default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 24),
}
def generate_tasks():
operator_list =[]
for i in range(5): # Replace to generate the logic you wish to dynamically create tasks
op = DummyOperator(task_id=f"t{str(i)}_task", dag=dag)
if i>0:
operator_list[i - 1] >> op
operator_list.append(op)
return operator_list

with DAG(
dag_id='loop',
default_args=default_args,
schedule_interval=None,
) as dag:
start_op = DummyOperator(task_id='start_task')
end_op = DummyOperator(task_id='end_task')
tasks = generate_tasks()
start_op >> tasks[0]
tasks[-1] >> end_op

enter image description here

您可以将 DummyOperator 替换为您想要的任何运算符。

关于airflow - 从应在 Airflow 中按顺序运行的函数返回任务列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65817709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com