gpt4 book ai didi

python - 使用数据库信息构建动态 DAG

转载 作者:行者123 更新时间:2023-12-04 03:38:16 25 4
gpt4 key购买 nike

我是 Airflow 的新手,我正在尝试找出使用从数据库检索的信息动态创建一组 DAG 的最佳方法。目前我已经想到了这个可能的解决方案:

# file: dags_builder_dag.py in DAG_FOLDER

# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
dag_id = 'hello_world_child_{}'.format(str(dag['id']))
default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
# Add dag to global scope to let airflow digest it.
globals()[dag_id] = create_dag(dag_id, default_args_child)

但是,如果我没记错的话,所有的 dag 文件,包括本例中生成所有 dag 的文件 (dags_builder_dag.py),都会被 Airflow 定期解析,这意味着 api_getDBInfo() 将被执行在每次解析时。如果我是对的,最好的做法是避免连续执行 api_getDBInfo(),这对数据库来说可能是一个耗时的操作?理想情况下,应仅在需要时检索此信息,比方说在手动触发时。

我想到的其他可能的解决方法:

  • 使用 Airlfow Variable作为评估是否该再次解析的标志 dags_builder_dag.py该变量可以按以下方式使用:
# file: dags_builder_dag.py in DAG_FOLDER

buildDAGs = Variables.get('buildDAGs')
if buildDAGs == 'true':
# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
dag_id = 'hello_world_child_{}'.format(str(dag['id']))
default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
# Add dag to global scope to let airflow digest it.
globals()[dag_id] = create_dag(dag_id, default_args_child)
  • 将 airflow.cfg 文件的参数 min_file_process_interval 设置为更高的值,以避免连续解析。但是,这也有增加 dags 运行时延迟的缺点。

更新

感谢@NicoE 和@floating_hammer,我找到了适合我的用例的解决方案。

第一次尝试:Airflow 变量作为缓存

我可以使用 Airflow 变量作为存储在数据库中的数据的缓存,以避免连续调用“api_getDBInfo”。然而,通过这种方式,我遇到了另一个瓶颈:可变大小。 Airflow 变量是键值对。键的长度为 - 256。存储在元数据中的值将受到元数据数据库支持的字符串大小的限制。 https://github.com/apache/airflow/blob/master/airflow/models/variable.py https://github.com/apache/airflow/blob/master/airflow/models/base.py

在我的例子中,我使用的是 Amazon MWAA以及与 aws 使用的底层元数据数据库及其结构相关的详细信息可能很难找到(实际上我并没有尝试进行大量调查)。所以我只是执行了一个压力测试,在变量中强制输入大量数据,看看会发生什么。下面是结果:

<表类="s-表"><头>数据量结果<正文>~0,5 MB(当前)写入和读取操作没有问题。~50 MB (x100)写入和读取操作没有问题。~125 MB (x250)写入和读取操作没有问题,但是使用 airflow 的 Web 控制台无法访问变量部分。服务器返回错误502“Bad gateway”~250 MB (x500)写入变量失败

第二次尝试:S3文件作为缓存

Airflow 变量有一个限制,正如之前的测试所示,所以我尝试保持相同的模式,使用 S3 文件更改 Airflow 变量,考虑到 S3 没有,这对我的特定用例很有效' Airflow 变量没有空间限制。

总结一下:

  1. 我创建了一个名为“sync_db_cache_dag”的 dag,它每小时使用 api_getDBInfo() 检索的数据更新 S3“db_cache.json”。数据以 JSON 格式存储。
  2. 脚本“dags_builder_dag.py”现在从“db_cache.json”中检索数据,这样数据库就无需连续调用“api_getDBInfo”。

最佳答案

您可以尝试以下步骤。

  • 创建一个变量,用于保存任务配置和要创建的任务数量。

创建一个以设定频率触发的 DAG。 dag 有两个任务。

  • 任务 1 读取数据库并填充变量。
  • 任务 2 读取变量并创建多个任务。

关于python - 使用数据库信息构建动态 DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66528113/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com