gpt4 book ai didi

python - 如何在失败时恢复 Prefect 流程而不必重新运行整个流程?

转载 作者:行者123 更新时间:2023-12-03 16:33:32 25 4
gpt4 key购买 nike

TL; 博士;
我无法使用级长的 FlowRunner来解决上面的问题。我可能要么用错了(见下文)要么错过了一些东西。真的很感激任何指点!

问题
我通读了精彩prefect core documentation并在 Handling Failure 上找到了这些部分和 Local Debugging与此最相关(可能错过了一些东西!)。 FlowRunner class 似乎(对我来说)是解决方案。
要查看是否可以使用 Flow Runner 恢复失败的流:

  • 运行失败的流程:

  • from time import sleep

    import prefect
    from prefect import Flow, task


    @task
    def success():
    sleep(3)
    return


    @task
    def failure():
    return 1 / 0


    def get_flow_runner():
    with Flow("Success/Failure") as flow:

    success()
    failure()

    return prefect.engine.FlowRunner(flow)
  • 在 iPython 中运行并保存状态:

  • In [1]: run nameofscript.py
    In [2]: flow_runner = get_flow_runner()
    In [3]: state = flow_runner.run()
  • failure() 中将 1/0 替换为 1/1所以任务会成功:
  • 最后把之前的状态传给了flow_runner希望它能恢复流程:

  • In [1]: run nameofscript.py
    In [2]: flow_runner = get_flow_runner()
    In [3]: flow_runner.run(task_states=state.result)
    整个流程再次运行,包括 3 秒成功的任务。

    最佳答案

    这里的问题是您每次运行都在重建 Flow,这会更改 Task 对象。 state.result是一个字典,其键是 Task 对象 - 如果底层 Task 对象以任何方式发生变化,那么它的哈希值也会发生变化。您应该改为使用更新的 Task 对象手动创建状态字典,如下所示:

    from prefect.engine.state import Success

    failure_task = runner.flow.get_tasks(name="failure")[0]
    task_states = {failure_task: Success("Mocked success")}

    关于python - 如何在失败时恢复 Prefect 流程而不必重新运行整个流程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63112736/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com