gpt4 book ai didi

email - Airflow - 2 个警报发送 on_failure

转载 作者:行者123 更新时间:2023-12-03 19:10:19 25 4
gpt4 key购买 nike

我在 Airflow 1.10 上有一个奇怪的错误。
我想尝试在 Microsoft Teams 上发送电子邮件和通知。
我做了一个小的哑 DAG 来尝试一下。
一切正常,但我连续收到 2 条通知。 2 封电子邮件和 2 条关于团队的消息。
我将它用于团队:https://github.com/mendhak/Airflow-MS-Teams-Operator
这里的 Dags :

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from operators.ms_teams_webhook_operator import MSTeamsWebhookOperator
from airflow.utils.email import send_email_smtp

default_args = {
"owner": "me",
"depends_on_past": False,
"start_date": datetime(2020, 6, 15),
'email_on_failure': False
}


def on_failure(context):
dag_id = context['dag_run'].dag_id

task_id = context['task_instance'].task_id
# context['task_instance'].xcom_push(key=dag_id, value=True)

logs_url = f"https://myairflow/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={context['ts']}"

teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_failure",
trigger_rule="all_done",
message=f"{dag_id} has failed on task: {task_id}",
button_text="View log",
button_url=logs_url,
theme_color="FF0000",
http_conn_id='msteams-python-webhook')
teams_notification.execute(context)

title = f"Titre {dag_id} - {task_id}"
body = title

send_email_smtp("gil.felot@lisea.fr", title, body)


def print_fail():
print("Hello !")
exit(1)


with DAG(
"test_email2", # ICI
default_args=default_args,
schedule_interval=None
) as dag:
preprocessing_started = DummyOperator(
task_id="go_email_go"
)

python_fail = PythonOperator(
task_id="pyhton_def",
python_callable=print_fail,
on_failure_callback=on_failure,
email_on_failure=False
)

preprocessing_started >> python_fail

编辑 :
改为使用 Hook。现在什么都没有触发
from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email_smtp
from hooks.ms_teams_webhook_hook import MSTeamsWebhookHook


def on_failure(context):
dag_id = context['dag_run'].dag_id

task_id = context['task_instance'].task_id
# context['task_instance'].xcom_push(key=dag_id, value=True)

logs_url = f"https://myairflow/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={context['ts']}"

teams_notification_hook = MSTeamsWebhookHook(
http_conn_id='msteams-python-webhook',
message=f"Le DAG {dag_id} a échoué sur la tâche : {task_id}",
subtitle="Voir les logs ?",
button_text="Logs",
button_url=logs_url,
theme_color="FF0000"
)
teams_notification_hook.execute()

title = f"Titre {dag_id} - {task_id}"
body = title

send_email_smtp("my@email.fr", title, body)


def on_success(context):
print("OK callback")
dag_id = context['dag_run'].dag_id

for i in context.items():
print(i)

teams_notification_hook = MSTeamsWebhookHook(
http_conn_id='msteams-python-webhook',
message=f"Le DAG {dag_id} s'est terminé avec succès",
theme_color="00EE00"
)
teams_notification_hook.execute(context)

title = f"Titre {dag_id} - Success"
body = title

send_email_smtp("my@email.fr", title, body)


default_args = {
"owner": "lisea-mesea",
"depends_on_past": False,
"start_date": datetime(2020, 6, 15),
"email_on_failure": False,
"on_failure_callback": on_success
# "on_failure_callback": on_failure
}


def print_fail():
print("Hello !")
exit(1)


with DAG(
"test_email2", # ICI
default_args=default_args,
schedule_interval=None
) as dag:
preprocessing_started = DummyOperator(
task_id="go_email_go"
)

python_fail = PythonOperator(
task_id="pyhton_def",
python_callable=print_fail,
# on_failure_callback=on_failure,
email_on_failure=False
)

preprocessing_started >> python_fail

最佳答案

我强烈建议使用 MSTeamsWebhookHookon_failure_callback而不是 MSTeamsWebhookOperator .
无需深入杂草,BaseOperator来自 MSTeamsWebhookOperator继承试图操纵它to the current dag当它被实例化时。
dag property setter可以看出是在 dag 上注册了任务实例。
这意味着除了在 on_failure_callback 中手动执行操作符之外, MSTeamsWebhookOperator 的任务实例已安排。
后者不是故意的,因为我们只关心在任务失败时发送 Hook 中提供的通知的能力。
这并没有解释为什么电子邮件被发送两次,因为这似乎没有使用 Operator。这需要单独调查。

关于email - Airflow - 2 个警报发送 on_failure,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62427954/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com