gpt4 book ai didi

python - Airflow - SubDag 中长时间运行的任务在一小时后标记为失败

转载 作者:太空狗 更新时间:2023-10-29 21:57:57 26 4
gpt4 key购买 nike

我在 airflow 中有一个 SubDAG,它有一个长时间运行的步骤(通常大约 2 小时,但它会根据正在运行的单元而有所不同)。在 1.7.1.3 下,此步骤将始终导致 AIRFLOW-736当其中的所有步骤都成功时,SubDAG 将停止在“运行”状态。我们可以通过在数据库中手动将 SubDagOperator 标记为成功(而不是运行)来解决此问题,因为我们在 SubDAG 之后没有步骤。

我们现在正在测试 Airflow 1.8.1,通过执行以下操作进行升级:

  1. 关闭我们的调度器和工作器
  2. 通过 pip,卸载 airflow 并安装 apache-airflow(版本 1.8.1)
  3. 运行 Airflow 升级b
  4. 运行 Airflow 调度器和工作器

在系统未受影响的情况下,同一个 DAG 现在大约在长时间运行的任务达到 1 小时标记后 100% 的时间失败(尽管奇怪的是,不正好是 3600 秒后 - 它可以是 30 到 90 之间的任何时间小时滴答后的秒数)和消息“执行者报告任务实例完成(失败),尽管任务说它正在运行。任务是否在外部被杀死?”。但是,任务本身继续在 worker 上运行,有增无减。尽管实际任务运行良好,但基于数据库错误地认为任务失败的调度程序(参见 jobs.py 的 this line)之间存在某种分歧。

我已经确认,不知何故, Airflow 数据库的 task_instance 表中的状态为“失败”。因此,我想知道当任务本身仍在运行时,是什么将任务状态设置为失败。

这是触发问题的示例 dag:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)

最佳答案

如果您确实在使用 Celery 和 Redis,请查看 visibility timeout setting对于 Celery 并将其增加到超出任务的预期结束时间。

虽然我们将 Celery 配置为 tasks-ack-late,但它仍然存在任务消失的问题。我们认为这 a bug在 celery 中。

关于python - Airflow - SubDag 中长时间运行的任务在一小时后标记为失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44274381/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com