gpt4 book ai didi

docker - 如何在Prefect流中使用自定义docker存储?

转载 作者:行者123 更新时间:2023-12-02 18:13:07 25 4
gpt4 key购买 nike

我已经设置了一个Dask集群,并且很高兴向其发送基本的Prefect流。
现在,我想做一些更有趣的事情,并在上面放一个自定义docker图像和python库,并在dask集群上执行流程/任务。
我的假设是我可以将dask集群(调度程序和工作程序)留在自己的python环境中(检查所有各种消息传递库在任何地方都有匹配的版本之后)。也就是说,如果Flow是在我的自定义storage中执行的,那么我不需要将我的库添加到这些机器中。
但是,或者我没有正确设置存储,或者假设以上情况并不安全。换句话说,也许当在我的自定义库中腌制对象时,Dask集群确实需要了解我的python库。假设我有一些名为data的通用python库...

import prefect    
from prefect.engine.executors import DaskExecutor
#see https://docs.prefect.io/api/latest/environments/storage.html#docker
from prefect.environments.storage import Docker

#option 1
storage = Docker(registry_url="gcr.io/my-project/",
python_dependencies=["some-extra-public-package"],
dockerfile="/path/to/Dockerfile")
#this is the docker build and register workflow!
#storage.build()

#or option 2, specify image directly
storage = Docker(
registry_url="gcr.io/my-project/", image_name="my-image", image_tag="latest"
)

#storage.build()

def get_tasks():
return [
"gs://path/to/task.yaml"
]

@prefect.task
def run_task(uri):
#fails because this data needs to be pickled ??
from data.tasks import TaskBase
task = TaskBase.from_task_uri(uri)
#task.run()
return "done"

with prefect.Flow("dask-example",
storage = storage) as flow:
#chain stuff...
result = run_task.map(uri=get_tasks())

executor = DaskExecutor(address="tcp://127.0.01:8080")
flow.run(executor=executor)
谁能解释这种/基于docker的工作流程如何/是否应该工作?

最佳答案

您的敏捷 worker 将需要访问您的任务所依赖运行的相同python库。实现此目的的最简单方法是使用与Flow相同的图像来运行dask worker。您可以手动执行此操作,也可以使用DaskCloudProviderEnvironment之类的方法来自动使用同一图像按流运行来创建短暂的Dask群集。

关于docker - 如何在Prefect流中使用自定义docker存储?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62500965/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com