gpt4 book ai didi

python - 完善如何避免重新运行任务

转载 作者:行者123 更新时间:2023-12-03 14:13:57 24 4
gpt4 key购买 nike

在 Prefect 中,假设我有一些管道,它为列表中的每个日期运行 f(date),并将其保存到文件中。这是一个非常常见的 ETL 操作。在 Airflow 中,如果我运行一次,它将回填所有历史日期。如果我再次运行它,它会知道任务已经运行,并且只运行任何已经出现的新任务(即最新日期)。
在 Prefect 中,据我所知,它每天都会运行整个管道,即使 99% 的任务在前一天完成。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?在退出之前,您是否只是在 redis 中执行诸如使每个任务缓存已完成的操作?

最佳答案

Prefect 有许多一流的缓存处理方式,这取决于您想要多少控制。对于每个任务,您可以指定是否应缓存结果、应将其缓存多长时间以及应如何使缓存失效(年龄、任务的不同输入、流参数值等)。
缓存任务的最简单方法是使用 targets ,它允许您指定任务具有可模板化的副作用(通常是本地或云存储中的文件,但可以是例如数据库条目、redis key 或其他任何内容)。在任务运行之前,它会检查副作用是否存在,如果存在,则跳过运行。
例如,此任务将其结果写入本地文件,自动以任务名称和当前日期为模板:

@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
只要匹配文件存在,任务就不会重新运行。因为 {today}是目标名称的一部分,它将隐式缓存一天的任务值。您还可以使用模板中的参数(如回填日期)来复制 Airflow 的行为。
如需更多控制,您可以使用 Prefect 的 full cache mechanism通过设置 cache_for , cache_validator , 和 cache_key在任何任务上。如果设置,任务将在 Cached 中完成状态而不是 Success状态。当与合适的编排后端(如 Prefect Server 或 Prefect Cloud)搭配使用时, Cached state 可以通过 future 运行相同的任务(或任何具有相同 cache_key 的任务)来查询。 future 的任务将返回 Cached状态作为它自己的结果。

关于python - 完善如何避免重新运行任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63419097/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com