gpt4 book ai didi

configuration - 如何在 Airflow 中使用配置文件

转载 作者:行者123 更新时间:2023-12-03 16:40:31 24 4
gpt4 key购买 nike

在 Airflow 中,我们创建了几个 DAGS。其中一些具有共同的属性,例如从中读取文件的目录。目前,这些属性被列为每个单独的 DAG 中的属性,这在 future 显然会成为问题。假设要更改目录名称,我们必须进入每个 DAG 并更新这段代码(甚至可能丢失一个)。

我正在研究创建某种配置文件,当需要某个属性时,可以将其解析为 Airflow 并由各种 DAGS 使用,但我似乎找不到任何关于如何执行此操作的文档或指南。我能找到的最多的是关于设置连接 ID 的文档,但这不符合我的用例。

我的帖子的问题是,是否可以执行上述方案以及如何执行?

提前致谢。

最佳答案

根据您的设置,有几种方法可以完成此操作:

  • 您可以使用 DagFactory 类型的方法,其中您有一个函数生成 DAG。你可以找到一个看起来像 here 的例子。
  • 您可以将 JSON 配置存储为 Airflow Variable ,并对其进行解析以生成 DAG。您可以在 Admin -> Variables 中存储类似的内容:

  • [{
    "table": "users",
    "schema":"app_one",
    "s3_bucket":"etl_bucket",
    "s3_key":"app_one_users",
    "redshift_conn_id":"postgres_default" },
    {
    "table": "users",
    "schema":"app_two",
    "s3_bucket":"etl_bucket",
    "s3_key":"app_two_users",
    "redshift_conn_id":"postgres_default"}]

    您的 DAG 可能会生成为:
    sync_config = json.loads(Variable.get("sync_config"))

    with dag:
    start = DummyOperator(task_id='begin_dag')
    for table in sync_config:
    d1 = RedshiftToS3Transfer(
    task_id='{0}'.format(table['s3_key']),
    table=table['table'],
    schema=table['schema'],
    s3_bucket=table['s3_bucket'],
    s3_key=table['s3_key'],
    redshift_conn_id=table['redshift_conn_id']
    )
    start >> d1

    同样,您可以将该配置存储为本地文件,然后像打开任何其他文件一样打开它。请记住,对此的最佳答案将取决于您的基础架构和用例。

    关于configuration - 如何在 Airflow 中使用配置文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52006063/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com