gpt4 book ai didi

python - on_failure_callback 多次触发

转载 作者:行者123 更新时间:2023-12-05 02:26:14 26 4
gpt4 key购买 nike

我想在 airflow PARALLEL 任务失败的情况下发布单个 Kafka 消息。我的 Airflow dags 与下面类似。

from datetime import datetime, timedelta
from airflow.models import Variable
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator


def task_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id }, error: {ti.xcom_pull(key='error')} ")
#call function to publish kafka message


def task_success_callback(context):
ti = context['task_instance']
print(f"Task {ti.task_id } has succeeded in dag { ti.dag_id }.")
#call function to publish kafka message

def dag_success_callback(context):
dag_status = f"DAG has succeeded, run_id: {context['run_id']}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message

def dag_failure_callback(context):
ti = context['task_instance']
dag_status = f"DAG has failed, run_id: {context['run_id']}, task id: {ti.task_id}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message

def user_func1(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 10 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e

def user_func2(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 2 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e
# pass

default_args = {
"on_success_callback": None,
"on_failure_callback": dag_failure_callback,
}

with DAG(
dag_id="test_callbacks_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
catchup=False,
) as dag:

task1 = PythonOperator(task_id="task1", python_callable=user_func1)
task2 = PythonOperator(task_id="task2", python_callable=user_func2)
task3 = DummyOperator(task_id="task3", on_success_callback=task_success_callback)
[task1, task2] >> task3

enter image description here

Airflow 并行任务失败日志:

[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 ID:task1

[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} 信息 - DAG 失败,run_id:manual__2022-10-07T18:40:50.355282+00:00,任务 ID:task2

如上所述,task1和task2是并行任务。我使用回调函数来触发相应的 Kafka 消息。对于成功场景,它会在最终任务期间触发一条成功消息。问题主要出现在失败任务期间,主要是任务并行运行时。如果 task1 和 task2 两个任务在并行运行期间都失败了,airflow 会为 task1 和 task2 触发两次 on_failure_callback。我同意这应该是 Airflow 的行为。但是根据我的要求,我不想触发多个 on_failure_callback。当它触发第一个 on_failure_callback 时,它不应该触发下一个回调,因为接收端被设计为处理单个错误场景,而不是多个/批量错误。

我在 on_failure_callback 函数 (dag_failure_callback) 下编写了 kafka 消息调用函数,如果我的第一个 task1 失败,它会触发一条消息到 kafka 主题,同时如果 task2 也失败,它会触发第二条消息到同一个 kafka 主题,我无法处理它,因为两者既并行又独立。我想在第一个 kafka 发布主题时停止,不想触发 kafka 消息以获取更多失败。

请建议,如何在并行任务失败期间限制 on_failure_callback。

最佳答案

您可以使用trigger_rule + PythonOperator 来处理失败的任务。这是一个例子:

import logging

import pendulum
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
dag_id='test',
start_date=pendulum.today('UTC').add(hours=-1),
schedule_interval=None,
)


def green_task(ti: TaskInstance, **kwargs):
logging.info('green')


def red_task(ti: TaskInstance, **kwargs):
raise Exception('red')


def check_tasks(ti: TaskInstance, **kwargs):
# find failed tasks. do what you need...
for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
logging.info(f'failed dag: {task.dag_id}, task: {task.task_id}. url: {task.log_url}')


t1 = PythonOperator(
dag=dag,
task_id='green_task',
python_callable=green_task,
provide_context=True,
)


t2 = PythonOperator(
dag=dag,
task_id='red_task1',
python_callable=red_task,
provide_context=True,
)


t3 = PythonOperator(
dag=dag,
task_id='red_task2',
python_callable=red_task,
provide_context=True,
)


check = PythonOperator(
dag=dag,
task_id='check',
python_callable=check_tasks,
provide_context=True,
trigger_rule=TriggerRule.NONE_SKIPPED,
)


t1 >> check
t2 >> check
t3 >> check

运行任务并查看检查任务日志:

[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task1. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task1&dag_id=test&map_index=-1
[2022-10-10, 15:12:39 UTC] {dag_test.py:27} INFO - failed dag: test, task: red_task2. url: http://localhost:8080/log?execution_date=2022-10-10T14%3A49%3A57.530923%2B00%3A00&task_id=red_task2&dag_id=test&map_index=-1

或者您可以将处理移至on_failure_callback:

def on_failure_callback(context):
ti = context['task_instance'] # type: TaskInstance
for task in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
# blablabla

关于python - on_failure_callback 多次触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73991675/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com