gpt4 book ai didi

dask - 按顺序迭代一个 dask 包

转载 作者:行者123 更新时间:2023-12-04 03:07:10 29 4
gpt4 key购买 nike

我需要将一个非常大的 dask.bag 的元素提交到一个非线程安全的存储区,即我需要类似的东西

for x in dbag:
store.add(x)

我无法使用compute,因为包太大,无法放入内存。我需要更像 distributed.as_completed 的东西,但它适用于袋子,而 distributed.as_completed 则不行。

最佳答案

我可能会继续使用普通计算,但会加一把锁

def commit(x, lock=None):
with lock:
store.add(x)

b.map(commit, lock=my_lock)

您可以在哪里创建 threading.Lockmultiprocessing.Lock,具体取决于您正在进行的处理类型

如果你想使用 as_completed,你可以将你的包转换为 futures 并在它们上使用 as_completed。

from distributed.client import futures_of, as_completed
b = b.persist()
futures = futures_of(b)

for future in as_completed(futures):
for x in future.result():
store.add(x)

您还可以转换为数据框,我相信它确实会更明智地进行迭代

df = b.to_dataframe(...)
for x in df.iteritems(...):
...

关于dask - 按顺序迭代一个 dask 包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47882748/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com