gpt4 book ai didi

slack - SlackAPIPostOperator 中的 Airflow UI 链接?

转载 作者:行者123 更新时间:2023-12-04 02:22:26 26 4
gpt4 key购买 nike

我正在使用 SlackAPIPostOperator在 Airflow 中,在任务失败时发送 Slack 消息。
我想知道是否有一种聪明的方法可以将失败任务的 Airflow UI 日志页面的链接添加到 slack 消息。

以下是我想要实现的示例:

http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken

当前消息是:

def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']))
return failed_alert.execute(context=context)

最佳答案

您可以使用配置值 base_url 构建 UI 的 url下[webserver]部分,然后使用 Slack 的 message format <http://example.com|stuff>对于链接。

from airflow import configuration

def slack_failed_task(context):
link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
base_url=configuration.get('webserver', 'BASE_URL'),
dag_id=context['dag'].dag_id,
task_id=context['task_instance'].task_id,
execution_date=context['ts'])) # equal to context['execution_date'].isoformat())

failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nSee ' + link + ' to debug')
return failed_alert.execute(context=context)

关于slack - SlackAPIPostOperator 中的 Airflow UI 链接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48152871/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com