gpt4 book ai didi

Airflow DAG : Customized Email on any of the Task failure

转载 作者:行者123 更新时间:2023-12-02 15:00:45 26 4
gpt4 key购买 nike

是否有任何选项自定义电子邮件并在 DAG 中出现任何任务失败时发送。有一个类似 'email_on_failure': True 的选项,但这不提供动态添加内容到电子邮件主题或正文的选项。

我的 DAG 将如下所示

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable

args = {
'owner': 'airflow',
'email': ['test@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'max_active_runs':10
}

dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')

new_cluster = {
'spark_version': '4.0.x-scala2.11',
'node_type_id': 'Standard_D16s_v3',
'num_workers': 3,
'spark_conf':{
'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
.....
},
'custom_tags':{
'ApplicationName':'TEST',
.....
}
}

t1 = DatabricksSubmitRunOperator(
task_id='t1',
dag=dag,
new_cluster=new_cluster,
......
)

t2 = SimpleHttpOperator(
task_id='t2',
method='POST',
........
)

t2.set_upstream(t1)

t3 = SimpleHttpOperator(
task_id='t3',
method='POST',
.....
)

t3.set_upstream(t2)

send_mail = EmailOperator (
dag=dag,
task_id="send_mail",
to=["test@gmail.com"],
subject=" Success",
html_content='<h3>Success</h3>')

send_mail.set_upstream(t3)

成功案例send_mail任务将发送自定义电子邮件到指定的电子邮件ID。

但万一任务失败,我想自定义电子邮件并发送到指定的电子邮件 ID。但这并没有发生,并且在失败的情况下,电子邮件将使用默认主题和正文发送

如有任何帮助,我们将不胜感激

最佳答案

我为此使用了on_failure_callback。请注意,DAG 中每个失败的任务都会触发它。

def report_failure(context):
# include this check if you only want to get one email per DAG
if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
logging.info("Other failing task has been notified.")
send_email = EmailOperator(...)
send_email.execute(context)

'''

dag = DAG(
...,
default_args={
...,
"on_failure_callback": report_failure
}
)

关于 Airflow DAG : Customized Email on any of the Task failure,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51726248/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com