gpt4 book ai didi

python - 启动时自动将数据集添加到 Dask 调度程序

转载 作者:行者123 更新时间:2023-12-05 07:40:07 25 4
gpt4 key购买 nike

长话短说
我想在启动时将数据集预加载到 Dask 分布式调度程序中。

背景
我正在以实时查询方式使用 Dask 和一个比内存更小的数据集。因为它是实时的,所以重要的是工作人员可以相信调度程序始终具有某些可用的数据集 - 即使在启动后立即可用。工作人员始终将整个数据集保存在内存中。

传统上,我通过连接客户端、分散 df 和发布数据集来完成此操作:

df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)

但这留下了调度程序重新启动并且数据集未加载的可能性。

我知道您可以使用 --preload 在启动时执行脚本,如下所示:

dask-scheduler --preload=scheduler-startup.py

样板代码如下所示:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)

def dask_setup(scheduler):
plugin = MyPlugin()
scheduler.add_plugin(plugin)

但我如何说服调度程序在不使用外部客户端的情况下加载我的数据集?

理论上我可能会删除一个启动预填充客户端的子进程,但感觉不太理想:)

调度程序启动中的普通客户端
尝试在调度程序启动时作为客户端连接:

from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client

class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)

def dask_setup(scheduler):
c = Client(scheduler.address)
df = dd.read_parquet('df.parq')
df = c.persist(df)
c.publish_dataset(flights=dfa)

c = Client(scheduler.address) 处挂起,必须强制终止 (kill -9)

最佳答案

您可以考虑将您的客户端代码添加到在事件循环中运行的异步函数中。这将允许预加载脚本完成,让调度程序启动,然后运行您的客户端代码。您可能需要如下内容:

async def f(scheduler):
client = await Client(scheduler.address)
df = dd.read_parquet(...)
await client.publish_dataset(flights=df)

def dask_setup(scheduler):
scheduler.loop.add_callback(f, scheduler)

关于python - 启动时自动将数据集添加到 Dask 调度程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46470578/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com