gpt4 book ai didi

airflow - 从 Airflow 中已清除的任务中获取 XComs

转载 作者:行者123 更新时间:2023-12-05 08:26:12 34 4
gpt4 key购买 nike

我有任务失败时的松弛警报,但我也希望有恢复消息。

当任务最初失败时,它会在其 on_failure_callback 中执行 xcom_push。我在此处保存的内容可在下一次 DAG 运行中使用:

context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=True)

但是,如果我清除失败的任务以便它重新运行,在其 on_failure_callback/on_success_callback 中,我会尝试此操作以获取我在初始尝试中保存的值:

context['ti'].xcom_pull(key='my_task_state',
task_ids=context['task'].task_id,
include_prior_dates=False)

这将返回 None。如果我设置 include_prior_dates=True,它将返回上一个 DAG 运行的值,但不会返回任务已清除的当前值。

我是不是做错了什么,或者是否有一种变通方法可以用来获得我正在寻找的 XCom 值?

最佳答案

Yong Wang's answer很好地解释了为什么我无法获得我想要的值。不过,我想出了一个解决方法。

xcom_pushxcom_pull 都只是调用 XCom 上的类方法。你可以直接调用这些。事实证明,您可以使用一个虚构的任务 ID,它会将其保存到该 ID 下的 xcom 表中。由于它不是真正的任务,因此当任务(或 DAG)被清除时它不会被删除。

from airflow.models import XCom

def set_xcom(context, value):
XCom.set(key='my_key',
value=value
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
execution_date=context['ti'].execution_date)

def get_xcom(context):
return XCom.get_one(context['ti'].execution_date,
key='my_key',
task_id='{}_SOME_SUFFIX'.format(context['ti'].task_id),
dag_id=context['ti'].dag_id,
include_prior_dates=False)

这不是使用 XCom 的标准方式,所以我以后升级到新版本的 Airflow 时必须小心。

关于airflow - 从 Airflow 中已清除的任务中获取 XComs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57470630/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com