gpt4 book ai didi

java - 构建需要上下文限制的 Airflow DAG

转载 作者:行者123 更新时间:2023-12-01 11:17:48 24 4
gpt4 key购买 nike

  • 我有一组我想作为 DAG 运行的工作单元( worker )
  • Group1 有 10 个工作人员,每个工作人员从数据库中提取多个表。请注意,每个worker 映射到单个数据库实例,每个worker 总共需要成功处理100 个表才能成功将自己标记为完整
  • Group1 有一个限制,即在所有这 10 个工作人员中,一次不应使用超过 5 个表。例如:
  • Worker1 正在提取 2 个表
  • Worker2 正在提取 2 个表
  • Worker3 正在提取 1 个表
  • Worker4...Worker10 需要等到 Worker1...Worker3 放弃线程
  • Worker4...Worker10 可以在 step1 中的线程释放后立即拾取表
  • 当每个 worker 完成所有 100 个表时,它无需等待就进入 step2。 Step2 没有并发限制

  • 我应该能够创建一个单节点 Group1 来满足节流的需求,并且还具有
  • 10 个独立的工作节点,因此我可以重新启动它们,以防万一其中任何一个失败

  • 我尝试在下图中解释这一点:
    enter image description here
  • 如果任何一个 worker 失败,我可以重新启动它而不会影响其他 worker 。它仍然使用与 Group1 相同的线程池,因此强制执行并发限制
  • 一旦 step1 和 step2 的所有元素都完成,Group1 就会完成
  • Step2 没有任何并发​​措施

  • 如何在 Airflow 中为 Spring Boot Java 应用程序实现这样的层次结构?
    是否可以使用 Airflow 构造来设计这种 DAG,并动态地告诉 Java 应用程序一次可以提取多少表。例如,如果除 Worker1 之外的所有 worker 都完成了,那么 Worker1 现在可以使用所有 5 个可用线程,而其他所有线程都将继续执行 step2。

    最佳答案

    这些约束不能建模为有向无环图,因此不能完全按照描述在 Airflow 中实现。但是,它们可以建模为队列,因此可以使用作业队列框架来实现。这是您的两个选择:

    作为 Airflow DAG 实现次优:

    from airflow.models import DAG
    from airflow.operators.subdag_operator import SubDagOperator
    # Executors that inherit from BaseExecutor take a parallelism parameter
    from wherever import SomeExecutor, SomeOperator

    # Table load jobs are done with parallelism 5
    load_tables = SubDagOperator(subdag=DAG("load_tables"), executor=SomeExecutor(parallelism=5))

    # Each table load must be it's own job, or must be split into sets of tables of predetermined size, such that num_tables_per_job * parallelism = 5
    for table in tables:
    load_table = SomeOperator(task_id=f"load_table_{table}", dag=load_tables)

    # Jobs done afterwards are done with higher parallelism
    afterwards = SubDagOperator(
    subdag=DAG("afterwards"), executor=SomeExecutor(parallelism=high_parallelism)
    )

    for job in jobs:
    afterward_job = SomeOperator(task_id=f"job_{job}", dag=afterwards)

    # After _all_ table load jobs are complete, start the jobs that should be done afterwards

    load_tables > afterwards


    这里的次优方面是,对于 DAG 的前半部分,集群将被 higher_parallelism - 5 未充分利用。 .

    使用作业队列优化实现:

    # This is pseudocode, but could be easily adapted to a framework like Celery

    # You need two queues
    # The table load queue should be initialized with the job items
    table_load_queue = Queue(initialize_with_tables)
    # The queue for jobs to do afterwards starts empty
    afterwards_queue = Queue()

    def worker():

    # Work while there's at least one item in either queue
    while not table_load_queue.empty() or not afterwards_queue.empty():
    working_on_table_load = [worker.is_working_table_load for worker in scheduler.active()]

    # Work table loads if we haven't reached capacity, otherwise work the jobs afterwards
    if sum(working_on_table_load) < 5:
    is_working_table_load = True
    task = table_load_queue.dequeue()
    else
    is_working_table_load = False
    task = afterwards_queue.dequeue()

    if task:
    after = work(task)
    if is_working_table_load:

    # After working a table load, create the job to work afterwards
    afterwards_queue.enqueue(after)

    # Use all the parallelism available
    scheduler.start(worker, num_workers=high_parallelism)

    使用这种方法,集群不会被充分利用。

    关于java - 构建需要上下文限制的 Airflow DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62011013/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com