gpt4 book ai didi

python-3.x - Airflow - 在任务之间锁定以便一次只运行一个并行任务?

转载 作者:行者123 更新时间:2023-12-03 16:32:48 26 4
gpt4 key购买 nike

我有一个 DAG,它具有三个任务流(licappts、agents、agentpolicy):

enter image description here

为简单起见,我将这三个不同的流称为。流是独立的,仅仅因为 agentpolicy 失败并不意味着其他两个(liceappts 和代理)应该受到其他流失败的影响。

但是对于 sourceType_emr_task_1 任务(即 licappts_emr_task_1、agents_emr_task_1 和 agentpolicy_emr_task_1),我一次只能运行其中一个任务。例如,我不能同时运行 agents_emr_task_1 和 agentpolicy_emr_task_1,即使它们是两个不一定相互关心的独立任务。

如何在 Airflow 中实现此功能?现在我唯一能想到的就是将该任务包装在一个以某种方式锁定全局变量的脚本中,然后如果该变量被锁定,我将让脚本执行 Thread.sleep(60 秒) 或其他操作,然后重试。但这似乎很棘手,我很好奇 Airflow 是否为此提供了解决方案。

如果需要实现这一点,我愿意重组我的 DAG 的顺序。我想做的一件事是对

Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished

但我不认为以这种方式组合流,因为例如 agentpolicy_emr_task_1 必须等待其他两个完成才能开始,并且有时 agentpolicy_emr_task_1 在其他两个完成其他任务之前就准备好了。

所以理想情况下,我希望任何 sourceType_emr_task_1 任务首先启动,然后阻止其他任务运行它们的 sourceType_emr_task_1 任务,直到它完成。

更新:

我刚刚想到的另一个解决方案是,如果有一种方法可以让我检查另一个任务的状态,我可以为 sourceType_emr_task_1 创建一个脚本来检查其他两个 sourceType_emr_task_1 任务中是否有任何一个处于运行状态,如果它们它会休眠并定期检查是否没有其他人正在运行,在这种情况下它将启动它的进程。不过,我不是这种方式的忠实粉丝,因为我觉得这可能会导致竞争条件,即两者都读取(同时)没有人正在运行并且都开始运行。

最佳答案

您可以使用 pool确保这些任务的并行度为 1。

对于每个*_emr_task_1任务,设置一个 pool kwarg 类似于 pool=emr_task .

然后进入 webserver -> admin -> pools -> create:
设置名称 Pool匹配您的运算符中使用的池,以及 Slots为 1。

这将确保调度程序只允许任务在该池中排队,最多达到配置的插槽数,而不管 Airflow 其余部分的并行度如何。

关于python-3.x - Airflow - 在任务之间锁定以便一次只运行一个并行任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51600181/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com