gpt4 book ai didi

python - 使用自定义索引重新分区 Dask Dataframe

转载 作者:太空宇宙 更新时间:2023-11-03 21:20:27 24 4
gpt4 key购买 nike

我有一个与此类似的巨大 Dask Dataframe

|Ind| C1 | C2 |....| Cn |
|-----------------------|
| 1 |val1| AE |....|time|
|-----------------------|
| 2 |val2| FB |....|time|
|-----------------------|
|...|....| .. |....| ...|
|-----------------------|
|n-1|valx| ZK |....|time|
| n |valn| QK |....|time|

我想根据 C2 列的唯一值对其进行重新分区,并将一个函数映射到每个分区。

首先我将 C2 设置为索引:

df = dd.readcsv(...)

df = df.set_index(df.C2)

现在我想重新分区新索引的数据帧并将函数映射到每个分区。我当前的方法如下:

unique_c2 = df.index.unique().compute()

df = df.repartition(division=list(unique_c2))

# list(unique_c2) looks like this: ['AE', 'FB', ..., 'ZK', 'QK']

df.map_partitions(lambda x: my_func(x), meta=df)

我想要的分区应该如下所示:

|Ind | C1 | C2 |....| Cn |
|------------------------|
| AE |val1| AE |....|time|
|------------------------|
| AE |val2| AE |....|time|
|------------------------|
|....|....| .. |....| ...|
|------------------------|
| AE |valn| AE |....|time|

...

|Ind | C1 | C2 |....| Cn |
|------------------------|
| ZK |val1| ZK |....|time|
|------------------------|
| ZK |val2| ZK |....|time|
|------------------------|
|....|....| .. |....| ...|
|------------------------|
| ZK |valn| ZK |....|time|

|Ind | C1 | C2 |....| Cn |
|------------------------|
| QK |val1| QK |....|time|
|------------------------|
| QK |val2| QK |....|time|
|------------------------|
|....|....| .. |....| ...|
|------------------------|
| QK |valn| QK |....|time|

但是重新分区函数“合并”了我的最后两个索引,因此我的最后一个分区如下所示:

|Ind | C1 | C2 |....| Cn |
|------------------------|
| ZK |val1| ZK |....|time|
|------------------------|
| ZK |val2| ZK |....|time|
|------------------------|
|....|....| .. |....| ...|
|------------------------|
| QK |valn| QK |....|time|
|------------------------|
|....|....| .. |....| ...|
|------------------------|
| QK |valn| QK |....|time|

您知道为什么会发生这种情况吗?或者您对我的问题有更好的解决方案吗?我知道有一个 dask.groupby(...).apply(...)。但我的映射函数有副作用,并且 apply(...) 在设计上总是为每个 dask 分区执行两次。

最佳答案

由于其设计方式,分区数始终为(分区数 + 1)。来自 docs :

Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index.

由于您设置了 divisions=list(unique_c2),因此您的划分数量将仅与您要进行分区的唯一 c2 值的数量相同。因此分区的数量将比您期望的少 1。

您可以通过将代码更改为来解决此问题:

    unique_c2_list = list(df.index.unique().compute())    df = df.repartition(divisions=sorted(unique_c2_list + [unique_c2_list[-1]]))

这会将最后一个唯一的 c2 值添加到分区列表的末尾。对于最终的划分,c2 的最小值和最大值将相同,因此这将产生所需数量的分区并防止最后两个被合并。

关于python - 使用自定义索引重新分区 Dask Dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54307559/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com