gpt4 book ai didi

python - 如何将 dask.dataframe 预缓存到所有 worker 和分区以减少通信需求

转载 作者:太空宇宙 更新时间:2023-11-04 04:06:27 26 4
gpt4 key购买 nike

有时使用 dask.dataframe.map_partitions 进行合并等操作很有吸引力。在某些情况下,当使用 map_partitionsleft_dfright_df 之间进行合并时,我想基本上预缓存 right_df 在执行合并之前减少网络开销/本地洗牌。有什么明确的方法可以做到这一点?感觉应该可以使用 client.scatter(the_df)client.run(func_to_cache_the_df) 之一或组合,或其他一些智能广播。

在对大型 left_df 与本质上是查找表的小得多的 right_df 进行左联接的上下文中,这一点尤为突出。感觉这个 right_df 应该能够读入内存并持久化/分散到所有 worker/partitions pre-merge 以减少跨分区通信的需要,直到最后。我怎样才能分散 right_df 来成功地做到这一点?

以下是使用 cuDF 和 Dask 进行这种不平衡合并的较小示例(但从概念上讲,这与 pandas 和 Dask 相同):

import pandas as pd
import cudf
import dask_cudf
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# create a local CUDA cluster
cluster = LocalCUDACluster()
client = Client(cluster)

np.random.seed(12)

nrows_left = 1000000
nrows_right = 1000

left = cudf.DataFrame({'a': np.random.randint(0,nrows_right,nrows_left), 'left_value':np.arange(nrows_left)})
right = cudf.DataFrame({'a': np.arange(nrows_right), 'lookup_val': np.random.randint(0,1000,nrows_right)})

print(left.shape, right.shape) # (1000000, 2) (1000, 2)

ddf_left = dask_cudf.from_cudf(left, npartitions=500)
ddf_right = dask_cudf.from_cudf(right, npartitions=2)

def dask_merge(L, R):
return L.merge(R, how='left', on='a')

result = ddf_left.map_partitions(dask_merge, R=ddf_right).compute()
result.head()
<cudf.DataFrame ncols=3 nrows=5 >
a left_value lookup_val
0 219 1952 822
1 873 1953 844
2 908 1954 142
3 290 1955 810
4 863 1956 910

最佳答案

如果您执行以下任一操作,则一切正常:

  • 与单分区 dask 数据框合并
  • 与非 dask 数据框(如 Pandas 或 cuDF)合并
  • 具有非 dask 数据框(如 Pandas 或 cuDF)的 map_partitions

这是怎么回事:

  1. 单个分区被推送给单个工作人员
  2. 在执行过程中,一些 worker 将复制该数据,然后其他人将从这些 worker 复制数据,依此类推,将数据传递到树中
  3. 工作人员将按预期进行合并

这与预期的一样快。但是,如果您正在做基准测试之类的事情,并且想要将步骤 1、2 和 3 分开,那么您可以使用 client.replicate:

left = ... # multi-partition dataframe
right = ... # single-partition dataframe
right = right.persist() # make sure it exists in one worker
client.replicate(right) # replicate it across many workers

... proceed as normal

这不会更快,但步骤 1、2 将被拉出到复制步骤中。

在您的示例中,right 看起来有两个分区。您可能想将其更改为一个。 Dask 采用不同的代码路径,在这种情况下,它本质上只是 map_partitions

关于python - 如何将 dask.dataframe 预缓存到所有 worker 和分区以减少通信需求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57274421/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com