gpt4 book ai didi

dask - 从 Dask Workers 写入 Redis

转载 作者:行者123 更新时间:2023-12-02 09:36:29 26 4
gpt4 key购买 nike

假设我有一个函数可以进行一些处理并将结果存储到 Redis 服务器

r = redis.StrictRedis()

def process(data):
(...do some work...)
r.put(...)

现在我有大量数据,我想使用 dask 来并行化该过程。类似的东西

from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)

但我得到 KeyError(<function process>) 。有什么想法吗?

编辑

它根据下面的 @mrocklin 答案工作,将连接初始化放置在函数内。我认为随着 worker 的加入和离开,这种联系将会被破坏并重新建立。如果我重写函数来接受一批数据,效率会不会更高。

def process(batches_data):
r = redis.StrictRedis()
for batch in batches_data:
(...do some work...)
r.put(...)

最佳答案

我的第一个猜测是你的对象r序列化得不好。这是相当典型的,因为具有实时连接的对象通常拒绝序列化(有充分的理由)。

相反,您可以尝试在函数内建立连接

def process(data):
r = redis.StrictRedis()
... do some work
r.put(...)

此外,我建议您持有submit产生的 future 。否则 Dask 会假设您不再关心这些任务并决定忽略它们

futures = [client.submit(process, x) for x in L]
wait(futures)

如果这不能解决您的问题,那么我建议使用更完整的异常和回溯来编辑您的原始问题。

关于dask - 从 Dask Workers 写入 Redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41096874/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com