gpt4 book ai didi

python - 当外部任务失败时, Airflow 外部任务传感器不会失败

转载 作者:行者123 更新时间:2023-12-04 02:30:54 25 4
gpt4 key购买 nike

我试图使用 ExternalTaskSensor在 Airflow 1.10.11 中管理一些 dag 的坐标。我开发了这个代码来测试功能:

import time
from datetime import datetime, timedelta
from pprint import pprint

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

sensors_dag = DAG(
"test_launch_sensors",
schedule_interval=None,
start_date=datetime(2020, 2, 14, 0, 0, 0),
dagrun_timeout=timedelta(minutes=150),
tags=["DEMO"],
)

dummy_dag = DAG(
"test_dummy_dag",
schedule_interval=None,
start_date=datetime(2020, 2, 14, 0, 0, 0),
dagrun_timeout=timedelta(minutes=150),
tags=["DEMO"],
)


def print_context(ds, **context):
pprint(context['conf'])


with dummy_dag:
starts = DummyOperator(task_id="starts", dag=dummy_dag)
empty = PythonOperator(
task_id="empty",
provide_context=True,
python_callable=print_context,
dag=dummy_dag,
)
ends = DummyOperator(task_id="ends", dag=dummy_dag)

starts >> empty >> ends

with sensors_dag:
trigger = TriggerDagRunOperator(
task_id=f"trigger_{dummy_dag.dag_id}",
trigger_dag_id=dummy_dag.dag_id,
conf={"key": "value"},
execution_date="{{ execution_date }}",
)
sensor = ExternalTaskSensor(
task_id="wait_for_dag",
external_dag_id=dummy_dag.dag_id,
external_task_id="ends",
failed_states=["failed", "upstream_failed"],
poke_interval=5,
timeout=120,
)
trigger >> sensor
这个想法是一个 dag 用 TriggerDagRunOperator 触发另一个.这设置了 execution_date为两个 dag 中的相同值。当 dummy_dag 的状态时,这完美地工作最后一个任务, ends , 是 success .
但是,如果我强制中间任务像这样失败:
def print_context(ds, **context):
pprint(context['conf'])
raise Exception('ouch')
传感器未检测到 failedupstream_failed状态,并且它会一直运行直到超时。我正在使用 failed_states参数来指示哪些状态需要被视为失败,但似乎不起作用。
难道我做错了什么?

最佳答案

failed_states在 Airflow 2.0 中添加;你应该把它设置为 ["failed"]如果受监控的 DAG 运行失败,则将传感器配置为使当前的 DAG 运行失败。如果给定任务 ID,它将监视任务状态,否则监视 DAG 运行状态。
不幸的是,在 Airflow 1.x 中,ExternalTaskSensor操作仅将 DAG 运行或任务状态与 allowed_states 进行比较;一旦受监控的 DAG 运行或任务达到允许的状态之一,传感器就会停止,然后始终标记为成功。默认情况下,传感器仅查找 SUCCESS状态,因此没有超时,如果受监控的 DAG 运行失败,它将永远继续戳。如果你把 failedallowed_states列表,它仍然只会将自己标记为成功。
虽然您可以使用超时,但与您一样,如果外部 DAG 运行失败,我需要传感器使其自己的 DAG 运行失败,就好像下一个任务的依赖关系尚未满足一样。不幸的是,这需要您编写自己的传感器。
这是我的实现;它是 ExternalTaskSensor() 的简化版本类,适合我更简单的需求(无需检查特定任务 ID 或除相同执行日期之外的任何其他内容):

from airflow.exceptions import AirflowFailException
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State

class ExternalDagrunSensor(BaseSensorOperator):
"""
Waits for a different DAG to complete; if the dagrun has failed, this
task fails itself as well.

:param external_dag_id: The dag_id that contains the task you want to
wait for
:type external_dag_id: str
"""

template_fields = ["external_dag_id"]
ui_color = "#19647e"

@apply_defaults
def __init__(self, external_dag_id, *args, **kwargs):
super().__init__(*args, **kwargs)
self.external_dag_id = external_dag_id

@provide_session
def poke(self, context, session=None):
dag_id, execution_date = self.external_dag_id, context["execution_date"]
self.log.info("Poking for %s on %s ... ", dag_id, execution_date)

state = (
session.query(DagRun.state)
.filter(
DagRun.dag_id == dag_id,
DagRun.execution_date == execution_date,
DagRun.state.in_((State.SUCCESS, State.FAILED)),
)
.scalar()
)
if state == State.FAILED:
raise AirflowFailException(
f"The external DAG run {dag_id} {execution_date} has failed"
)
return state is not None
基本传感器实现将调用 poke()方法重复直到它返回 True (或达到可选超时),并通过提高 AirflowFailException任务状态设置为立即失败,不重试。如果它们将被安排运行,则由下游任务配置决定。

关于python - 当外部任务失败时, Airflow 外部任务传感器不会失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64226671/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com