gpt4 book ai didi

amazon-web-services - 在 Apache Airflow DAG 中使用 AWS SES 发送失败的电子邮件

转载 作者:行者123 更新时间:2023-12-04 08:08:13 25 4
gpt4 key购买 nike

每当我的 DAG 中的任务无法运行或重试运行时,我都会尝试让 Airflow 使用 AWS SES 向我发送电子邮件。我也在使用我的 AWS SES 凭证,而不是我的通用 AWS 凭证。

我的当前 Airflow .cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com

我的 DAG 中的当前任务旨在故意失败并重试:
testfaildag_library_install_jar_jdbc = PythonOperator(
task_id='library_install_jar',
retries=3,
retry_delay=timedelta(seconds=15),
python_callable=add_library_to_cluster,
params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
dag=dag,
email_on_failure=True,
email_on_retry=True,
email=’myname@myjob.com’,
provide_context=True
)

一切都按设计的那样工作,因为任务重试设定的次数并最终失败,除了没有发送电子邮件。我也检查了上面提到的任务中的日志,从来没有提到过smtp。

我看过类似的问题 here ,但唯一的解决方案对我不起作用。此外,Airflow 的文档,例如他们的示例 here似乎也不适合我。

SES 是否与 Airflow 的 email_on_failure 和 email_on_retry 函数配合使用?

我目前正在考虑使用 on_failure_callback调用 AWS 提供的 Python 脚本的函数 here在失败时发送电子邮件,但这不是此时的首选路线。

谢谢,感谢任何帮助。

最佳答案

--使用 SES 更新 6/8

这是我写的关于我们如何让这一切正常工作的文章。这个答案的底部有一个小总结。

几个重点:

  • 我们决定不使用 Amazon SES,而是使用 sendmail 我们现在已经启动并运行了 SES。
  • email_on_failure 提供服务的是 Airflow worker 和 email_on_retry特征。你可以做journalctl –u airflow-worker –f在 Dag 运行期间监控它。在您的生产服务器上,您无需在更改 airflow.cfg 后重新启动 Airflow 工作器。使用新的 smtp 设置 - 应该会自动提取。无需担心弄乱当前正在运行的 Dag。

  • 以下是有关如何使用 sendmail 的技术文章:

    由于我们在本地主机上从 ses 更改为 sendmail,因此我们必须更改 airflow.cfg 中的 smtp 设置。 .

    新配置是:
    [email]
    email_backend = airflow.utils.email.send_email_smtp


    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = localhost
    smtp_starttls = False
    smtp_ssl = False
    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
    #smtp_user = not used
    #smtp_password = not used
    smtp_port = 25
    smtp_mail_from = myjob@mywork.com

    这适用于生产和本地 Airflow 实例。

    如果他们的配置不像我上面的那样,人们可能会收到一些常见的错误:
  • socket.error: [Errno 111] Connection refused -- 您必须更改您的 smtp_host线在 airflow.cfglocalhost
  • smtplib.SMTPException: STARTTLS extension not supported by server. -- 您必须更改您的 smtp_starttlsairflow.cfgFalse

  • 在我的本地测试中,我试图简单地强制 Airflow 显示尝试发送电子邮件时发生的事情的日志——我创建了一个假 dag,如下所示:
    # Airflow imports
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dummy_operator import DummyOperator

    # General imports
    from datetime import datetime,timedelta

    def throwerror():
    raise ValueError("Failure")

    SPARK_V_2_2_1 = '3.5.x-scala2.11'

    args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
    }

    dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

    t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
    )

    t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
    )

    t2.set_upstream(t1)

    如果你做 journalctl -u airflow-worker -f ,您可以看到工作人员说它已向您的 DAG 中的电子邮件发送了有关失败的警报电子邮件,但我们仍未收到该电子邮件。然后我们决定通过做 cat /var/log/maillog 来查看 sendmail 的邮件日志。 .我们看到了这样的日志:
    Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
    Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
    Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
    Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
    Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
    Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
    Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

    所以这可能是最大的“Oh duh”时刻。在这里,我们能够看到我们的 smtp 服务中实际发生了什么。我们使用 telnet 确认我们无法从 gmail 连接到目标 IP 范围。

    我们确定电子邮件正在尝试发送,但 sendmail 服务无法成功连接到 ip 范围。

    我们决定允许 AWS 中端口 25 上的所有出站流量(因为我们的 Airflow 生产环境是一个 ec2 实例),现在它可以成功运行。我们现在可以接收有关失败和重试的电子邮件(提示: email_on_failureemail_on_retry 在您的 DAG API Reference 中默认为 True - 如果您不想,则无需将其放入您的参数中,但在其中明确说明 True 或 False 仍然是一个好习惯)。

    SES 现在可以工作了。这是 Airflow 配置:
    [email]
    email_backend = airflow.utils.email.send_email_smtp


    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = emailsmtpserver.region.amazonaws.com
    smtp_starttls = True
    smtp_ssl = False
    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
    smtp_user = REMOVEDAWSACCESSKEY
    smtp_password = REMOVEDAWSSECRETACCESSKEY
    smtp_port = 587
    smtp_mail_from = myemail@myjob.com (Verified SES email)

    谢谢!

    关于amazon-web-services - 在 Apache Airflow DAG 中使用 AWS SES 发送失败的电子邮件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50645578/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com