gpt4 book ai didi

python - 动态改变任务重试次数

转载 作者:行者123 更新时间:2023-12-01 00:17:38 25 4
gpt4 key购买 nike

重试任务可能毫无意义。例如,如果任务是传感器,并且由于凭据无效而失败,那么以后的任何重试都将不可避免地失败。如何定义可以决定重试是否合理的运算符(operator)?

在 Airflow 1.10.6 中,决定是否应重试任务的逻辑位于 airflow.models.taskinstance.TaskInstance.handle_failure 中,因此无法在运算符中定义行为因为这是任务的责任,而不是运算符(operator)的责任。

理想的情况是,handle_failure 方法是在 Operator 端定义的,这样我们就可以根据需要重新定义它。

我发现的唯一解决方法是使用PythonBranchingOperator来“测试”任务是否可以运行。例如,在上述传感器的情况下,检查登录凭据是否有效,然后才将 DAG 流传送到传感器。否则,失败(或分支到另一个任务)。

我对handle_failure的分析正确吗?有更好的解决方法吗?

最佳答案

回答我自己的问题,通过修改所有运算符中可用的 self.retries 实例变量,在 execute 方法中,我们可以动态地强制不再重试。

在以下示例中:

  1. 传感器 0:第一次尝试就会成功
  2. 传感器 1:尝试 4 次后将失败(最多重试 1 + 3 次)
  3. 传感器 2:尝试 1 次后将失败(动态强制不再重试)
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import BaseOperator


class PseudoSensor(BaseOperator):
def __init__(
self,
s3_status_code_mock,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.s3_status_code_mock = s3_status_code_mock

def execute(self, context):
# Try to read S3, Redshift, blah blah
pass
# The query returned a status code, that we mock when the Sensor is initialized
if self.s3_status_code_mock == 0:
# Success
return 0
elif self.s3_status_code_mock == 1:
# Error but should retry if I can still can
raise Exception("Retryable error. Won't change retries of operator.")
elif self.s3_status_code_mock == 2:
# Unrecoverable error. Should fail without future retries.
self.retries = 0
raise Exception("Unrecoverable error. Will set retries to 0.")


# A separate function so we don't make the globals dirty
def createDAG():
# Default (but overridable) arguments for Operators instantiations
default_args = {
'owner': 'Supay',
'depends_on_past': False,
'start_date': datetime(2019, 11, 28),
'retry_delay': timedelta(seconds=1),
'retries': 3,
}

with DAG("dynamic_retries_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False) as dag :
# Sensor 0: should succeed in first try
sensor_0 = PseudoSensor(
task_id="sensor_0",
provide_context=True,
s3_status_code_mock=0,
)

# Sensor 1: should fail after 3 tries
sensor_1 = PseudoSensor(
task_id="sensor_1",
provide_context=True,
s3_status_code_mock=1
)

# Sensor 1: should fail after 1 try
sensor_2 = PseudoSensor(
task_id="sensor_2",
provide_context=True,
s3_status_code_mock=2
)

dag >> sensor_0
dag >> sensor_1
dag >> sensor_2

globals()[dag.dag_id] = dag


# Run everything
createDAG()

甘特图显示每个任务的尝试次数 enter image description here

关于python - 动态改变任务重试次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59201189/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com