gpt4 book ai didi

Airflow:动态 SubDag 创建

转载 作者:行者123 更新时间:2023-12-01 18:28:37 25 4
gpt4 key购买 nike

我有一个用例,其中有一个客户列表。可以在列表中添加或删除客户端,并且它们可以具有不同的开始日期和不同的初始参数。

我想使用 Airflow 根据每个客户端的初始开始日期回填所有数据+如果出现故障则重新运行。我正在考虑为每个客户创建一个 SubDag。这能解决我的问题吗?

如何根据 client_id 动态创建 SubDags?

最佳答案

您绝对可以动态创建 DAG 对象:

def make_client_dag(parent_dag, client):
return DAG(
'%s.client_%s' % (parent_dag.dag_id, client.name),
start_date = client.start_date
)

然后,您可以在主 dag 的 SubDagOperator 中使用该方法:

for client in clients:
SubDagOperator(
task_id='client_%s' % client.name,
dag=main_dag,
subdag = make_client_dag(main_dag, client)
)

这将创建一个特定于clients集合中每个成员的子dag,并且每个子dag都将在下次调用主dag时运行。我不确定您是否会得到您想要的回填行为。

关于Airflow:动态 SubDag 创建,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45746482/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com