gpt4 book ai didi

python - 如何使用 Airflow 变量动态运行 DAG 的多个作业

转载 作者:行者123 更新时间:2023-12-01 08:02:23 25 4
gpt4 key购买 nike

我知道如何使用变量动态运行 DAG 的任务,并且它工作得很好,直到您触发同一 DAG 的多次运行。

即,一旦在data/to/load/目录下创建了一个包含文件的新目录,我就会在某处编写一个脚本,该脚本将触发airflow变量-set dir data/to/load/$newDir 后跟 airflow trigger_dag dyn_test。现在假设目录“a”和“b”在data/to/load/下创建(同时),这将使airflow变量+airflow trigger_dag调用两次变量集调用有两个不同的输入(一个以“a”为后缀,另一个以“b”为后缀)。我在 Airflow GUI 中看到两个为 DAG 运行的作业,但问题是它们都考虑相同的目录值 a 或 b。这肯定意味着它需要最终的“Airflow 变量设置”调用。我该如何解决?触发多次运行的方法是什么,每次运行都采用不同的值(在 dir 变量中)来动态循环。我的达格看起来像这样:

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")


args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}

dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2]

for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'

task1 >> task2

最佳答案

您的问题中描述的场景是一个先进先出队列适合的场景,假设您希望保留显式设置要作为单独处理的目录的当前方式序列。

也就是说,Airflow CLI trigger_dags 命令允许传递 --conf 标志来设置在 DagRun 中传递的配置字典,我正如您所描述的那样,在设置变量的地方,就会触发 dag。

http://airflow.apache.org/cli.html#trigger_dag

这是代码中可能的样子。

airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'

您将在用于任务的 Airflow 运算符中设置 provide_context kwargs

可以在上下文中检索 DagRun 的实例,并在检索的配置中设置 dir

假设您使用 Airflow PythonOperator 定义了任务;那么您在 python_callable 中检索 dir 的代码将类似于以下内容:

def me_seeks(dag_run=None):
dir = dag_run.conf['me_seeks.dir']

关于python - 如何使用 Airflow 变量动态运行 DAG 的多个作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55674682/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com