gpt4 book ai didi

airflow - 根据 Airflow 中 sql 查询的结果创建动态任务

转载 作者:行者123 更新时间:2023-12-04 13:53:33 25 4
gpt4 key购买 nike

我正在尝试使用 TaskGroup 创建动态任务,将结果保存在变量中。根据数据库查询,该变量每 N 分钟修改一次,但是当第二次修改该变量时,调度程序会崩溃
基本上我需要根据查询中收到的唯一行数创建任务。
以 TaskGroup(f"task") 作为任务:

    data_variable = Variable.get("df")
data = data_variable

try :
if data != False and data !='none':
df = pd.read_json(data)

for field_id in df.field.unique():


task1 = PythonOperator(

)
task2 = PythonOperator(

)


task1 >> task2

except:
pass
有没有办法用任务组来做到这一点?

最佳答案

这是 Airflow 的反模式。
虽然您可以使用 Variable.get("df")在顶级代码中,您不应该这样做。变量/连接/使用任何数据库创建查询的任何其他代码应仅在操作符范围内或使用 Jinja 模板完成。这样做的原因是 Airflow 会定期解析 DAG 文件(如果您没有更改 min_file_process_interval 的默认值,则每 30 秒一次),因此每 30 秒与数据库交互一次的代码将导致该数据库的负载过重。
对于其中一些情况, future 的 Airflow 版本会发出警告(参见 PR)
Airflow 任务应尽可能保持静态(或缓慢变化)。

关于airflow - 根据 Airflow 中 sql 查询的结果创建动态任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66637477/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com