gpt4 book ai didi

airflow - Apache Airflow - 获取所有父 task_id

转载 作者:太空宇宙 更新时间:2023-11-04 02:05:57 26 4
gpt4 key购买 nike

假设有以下情况:

[c1, c2, c3] >> child_task

其中所有 c1c2c3child_task 都是运算符并且具有 task_id 分别等于 id1id2id3child_id

任务 child_task 也是一个 PythonOperator,具有 provide_context=Truepython_callable=dummy_func

def dummy_func(**context):
#...

是否有可能在 dummy_func 中检索所有 parent 的 ID(可能通过使用上下文以某种方式浏览 dag)?

本例中的预期结果将是列表 ['id1', 'id2', 'id3']

最佳答案

upstream_task_idsdownstream_task_ids properties of BaseOperator仅用于此目的。

from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids

但是请注意,使用此属性,您只能获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 任务,您可以快速编造出古老的图论方法,例如这个 BFS-类似实现

from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator

def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks

以上代码段未经测试,但我相信您可以从中获得灵感


引用资料

关于airflow - Apache Airflow - 获取所有父 task_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54728513/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com