gpt4 book ai didi

airflow - Airflow 任务能否在运行时动态生成 DAG?

转载 作者:行者123 更新时间:2023-12-03 16:59:30 25 4
gpt4 key购买 nike

我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。
我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。从概念上讲:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
在我最初的实现中, CreateDAGTaskPythonOperator创建 DAG 全局变量,将它们放置在全局命名空间( see this SO answer )中,如下所示:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path

UPLOAD_LOCATION = "/opt/files/uploaded"

# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
dags = []
for filepath in Path(location).glob('*'):
dag_name = f"process_{filepath.name}"
dag = DAG(dag_name, schedule_interval="@once", default_args={
"depends_on_past": True,
"start_date": datetime(2020, 7, 15),
"retries": 1,
"retry_delay": timedelta(hours=12)
}, catchup=False)
dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")

dags.append(dag)

# Try to place the DAG into globals(), which doesn't work
globals()[dag_name] = dag

return dags
然后主 DAG 通过 PythonOperator 调用此逻辑:
# File-sensing DAG
default_args = {
"depends_on_past" : False,
"start_date" : datetime(2020, 7, 16),
"retries" : 1,
"retry_delay" : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:

start_task = DummyOperator(task_id="start")
stop_task = DummyOperator(task_id="stop")
sensor_task = FileSensor(task_id="my_file_sensor_task",
poke_interval=60,
filepath=UPLOAD_LOCATION)
process_creator_task = PythonOperator(
task_id="process_creator",
python_callable=generate_dags_for_files,
)
start_task >> sensor_task >> process_creator_task >> stop_task
但这不起作用,因为到时候 process_creator_task运行时,Airflow 已经解析了全局变量。解析时间后的新全局变量无关紧要。
临时解决方案
Airflow dynamic DAG and task Ids ,我可以通过省略 FileSensor 来实现我想要做的事情任务一并让 Airflow 在每个调度程序心跳时生成每个文件的任务,仅执行 generate_dags_for_files 替换 Sensor_DAG :更新:没关系——虽然这确实在仪表板中创建了一个 DAG,但实际执行运行到 "DAG seems to be missing"问题:
generate_dags_for_files()
这确实意味着我无法再使用 poke_interval 调节文件夹轮询的频率。 FileSensor 的参数;相反,Airflow 每次收集 DAG 时都会轮询文件夹。
这是这里最好的模式吗?
其他相关的 StackOverflow 线程
  • Run Airflow DAG for each fileAirflow: Proper way to run DAG for each file :相同的用例,但接受的答案使用两个静态 DAG,大概具有不同的参数。
  • Proper way to create dynamic workflows in Airflow - 接受的答案通过复杂的 XCom 设置动态创建任务,而不是 DAG。
  • 最佳答案

    简而言之:如果任务写在哪里DagBag读取自,是的,但最好避免需要这样做的模式。 您想要在任务中自定义创建的任何 DAG 都应该是静态的、大量参数化的、条件触发的 DAG。 y2k-shubham provides an excellent example of such a setup ,我很感激他在关于这个问题的评论中的指导。
    也就是说,以下是可以完成问题的方法,无论想法多么糟糕 ,在越来越多的笨手笨脚的情况下:

  • 如果您从变量 ( like so ) 动态生成 DAG,请修改该变量。
  • 如果您从配置文件列表动态生成 DAG,请将新的配置文件添加到您从中提取配置文件的任何位置,以便在下一个 DAG 集合中生成新的 DAG。
  • 使用类似 Jinja 模板的东西在 dags/ 中编写一个新的 Python 文件。文件夹。

  • 要在任务运行后保留对任务的访问权限,您必须保持新的 DAG 定义稳定且可在 future 的仪表板更新中访问/ DagBag收藏。否则, the Airflow dashboard won't be able to render much about it.

    关于airflow - Airflow 任务能否在运行时动态生成 DAG?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62962386/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com