gpt4 book ai didi

airflow - Apache Airflow 任务卡在 'up_for_retry' 状态

转载 作者:行者123 更新时间:2023-12-04 11:47:36 34 4
gpt4 key购买 nike

我一直在我们的系统上设置一个 Airflow 集群,之前它一直在工作。我不确定我可能做了什么来改变这一点。

我有一个想要按计划运行的 DAG。为了确保它正常工作,我还想手动触发它。目前这些似乎都不起作用,并且似乎没有为任务实例写入日志。唯一可用的日志是 Airflow 调度程序日志,它们通常看起来很健康。

我只是不断地收到这条消息:
Task is not ready for retry yet but will be retried automatically. Current date is 2018-12-12T11:34:46.978355+00:00 and task will be retried at 2018-12-12T11:35:08.093313+00:00.
但是,如果我稍等一下,就会再次显示完全相同的消息,只是时间向前移动了一点。因此,该任务似乎从未真正被重试过。

我正在使用 LocalExecutor,任务是 SSHOperator。简化代码如下。它所做的只是将 ssh 连接到另一台机器上,并使用预先确定的目录结构启动一堆应用程序。:

DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() -
timedelta(days=1))

DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': TIME_IN_PAST,
'email': ['some_email@blah.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
with open(file_name) as open_file:
return json.load(open_file)

DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')

INSTANCE_START_SCRIPT_PATHS = {
_extract_instance_id(instance_string): directory+START_SCRIPT
for instance_string, directory in APP_DIRS.items()
}

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')

# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
default_args=DEFAULT_ARGS)

# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
task = SSHOperator(
task_id='run-script-{0}'.format(instance_id),
command='bash {0}'.format(start_script),
ssh_hook=SSH_HOOK,
dag=DAG)

当我通过命令行而不是通过 UI 单独运行任务时,它可以工作。似乎我可以运行任务,但我根本无法触发 DAG 运行。我已经尝试了许多 start_date 和间隔时间表的组合,只是为了进行完整性检查。

有任何想法吗?

是的,我知道之前有人问过这个问题,我已经看过所有这些问题,但没有一个解决方案对我有帮助。

最佳答案

哦。您的 start_date正在以与计划间隔期相同或更快的速度变化。

这是调度程序每隔几秒钟看到的内容:

start_date: 2018-12-11T12:12:12.000Z  # E.G. IFF now is 2018-12-12T12:12:12.000Z, a day ago
schedule_interval: timedelta(days=1) # The default

以下是调度程序运行 DAG 所需的内容:上次运行发生的时间超过一个调度间隔之前。如果没有 预定 运行已发生,第一次 预定 如果自 start_date 起已经过去了一个完整的计划间隔,则现在可以开始运行因为这是 execution_date 的最早允许日期.在这种情况下 dag_runexecution_date设置为应该创建该间隔期的开始。然后 task_instance只要满足 task_instance 的依赖关系,就可以为 DAG 中的任何任务创建 s。 execution_date是在 start_date 之后DAG 的(这不存储在 dag_run 对象中,而是通过在检查 dag 状态时加载 DAG 文件来重新计算)。

因此,它不会自动安排,因为开始日期在满足间隔时不断变化。但是,如果它是 -2d,则至少会安排一次运行,然后任何进一步的运行都必须等到 1d 之后才能安排。如果你只是设置一个固定的 datetime 会更容易在您的 start_date .

但是手动运行时那些奇怪的重试呢?

您确实开始了一两次手动运行。这些运行将当前时间作为 execution_date除非您指定了其他内容。这应该在 start_date 之后,至少到明天,这应该清除它们运行。但是,在您的日志中,您似乎看到它们失败并被标记为重试,并且不会减少您的重试。我不确定为什么会这样,但可能是 SSHOperator 有问题吗? .

您是否使用 [ssh] 安装了 Airflow ?额外以便在网络服务器和调度程序上满足 SSHOperator 的依赖关系(特别是 paramikosshtunnel )?其中之一正在工作,因为我认为它正在解析并根据添加到数据库而显示在 UI 中。

如果你执行,你会得到什么:
airflow test my-dag-id run-script-an_instance_id 2018-12-12T12:12:12

您知道调度程序和网络服务器正在循环重新填充他们的 DAG 包,因此每天重新运行此 DAG 文件 1000 次,重新加载该 json(它是本地访问,因此类似于导入模块),并重新创建 SSHHook使用数据库查找。我没有看到设置这个钩子(Hook)有什么特别之处,为什么不删除 ssh_hook来自 SSHOperator并将其替换为 ssh_conn_id='my-conn-id'所以它可以在执行时创建一次?
我怀疑这就是导致重试只是向前推进的问题。

关于airflow - Apache Airflow 任务卡在 'up_for_retry' 状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53744198/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com