gpt4 book ai didi

python - Dask 连接一系列数据帧

转载 作者:太空宇宙 更新时间:2023-11-03 20:15:07 25 4
gpt4 key购买 nike

我有一个 Dask 系列 Pandas DataFrame。我想使用 dask.dataframe.multi.concat 将其转换为 Dask DataFrame。但是,dask.dataframe.multi.concat 始终需要 DataFrame 列表。

我可以对 Dask 系列的 Pandas DataFrame 执行计算来获取 Pandas 系列的 DataFrame,此时我可以将其转换为列表。但我认为最好不要调用compute,而是直接从Dask系列的Pandas DataFrames中获取Dask DataFrame。

最好的方法是什么?这是我生成一系列数据帧的代码

import pandas as pd
import dask.dataframe as dd
import operator
import numpy as np
import math
import itertools

def apportion_pcts(pcts, total):
"""Apportion an integer by percentages
Uses the largest remainder method
"""
if (sum(pcts) != 100):
raise ValueError('Percentages must add up to 100')
proportions = [total * (pct / 100) for pct in pcts]
apportions = [math.floor(p) for p in proportions]
remainder = total - sum(apportions)
remainders = [(i, p - math.floor(p)) for (i, p) in enumerate(proportions)]
remainders.sort(key=operator.itemgetter(1), reverse=True)
for (i, _) in itertools.cycle(remainders):
if remainder == 0:
break
else:
apportions[i] += 1
remainder -= 1
return apportions


# images_df = dd.read_csv('./tests/data/classification/images.csv')
images_df = pd.DataFrame({"image_id": [0,1,2,3,4,5], "image_class_id": [0,1,1,3,3,5]})
images_df = dd.from_pandas(images_df, npartitions=1)

output_ratio = [80, 20]

def partition_class (partition):
size = len(partition)
proportions = apportion_pcts(output_ratio, size)
slices = []
start = 0
for proportion in proportions:
s = slice(start, start + proportion)
slices.append(partition.iloc[s, :])
start = start+proportion
slicess = pd.Series(slices)
return slicess

partitioned_schema = dd.utils.make_meta(
[(0, object), (1, object)], pd.Index([], name='image_class_id'))
partitioned_df = images_df.groupby('image_class_id')
partitioned_df = partitioned_df.apply(partition_class, meta=partitioned_schema)

partitioned_df中,我们可以通过partitioned_df[0]或者partitioned_df[1]来获取一系列的dataframe对象。

<小时/>

以下是 CSV 文件的示例:

image_id,image_width,image_height,image_path,image_class_id
0,224,224,tmp/data/image_matrices/0.npy,5
1,224,224,tmp/data/image_matrices/1.npy,0
2,224,224,tmp/data/image_matrices/2.npy,4
3,224,224,tmp/data/image_matrices/3.npy,1
4,224,224,tmp/data/image_matrices/4.npy,9
5,224,224,tmp/data/image_matrices/5.npy,2
6,224,224,tmp/data/image_matrices/6.npy,1
7,224,224,tmp/data/image_matrices/7.npy,3
8,224,224,tmp/data/image_matrices/8.npy,1
9,224,224,tmp/data/image_matrices/9.npy,4

之后我尝试进行缩减,但由于代理 foo 字符串,这不太有意义。

def zip_partitions(s):
r = []
for c in s.columns:
l = s[c].tolist()
r.append(pd.concat(l))
return pd.Series(r)

output_df = partitioned_df.reduction(
chunk=zip_partitions
)

我尝试连接的代理列表是['foo', 'foo']。这个阶段的目的是什么?发现如何完成任务?但随后某些操作就不起作用了。我想知道是否是因为我正在操作对象才得到这些字符串。

最佳答案

我通过在最后应用归约来将每个数据帧“压缩”为一系列数据帧,从而找到了答案。

import pandas as pd
import dask.dataframe as dd
import operator
import numpy as np
import math
import itertools


def apportion_pcts(pcts, total):
"""Apportion an integer by percentages
Uses the largest remainder method
"""
if (sum(pcts) != 100):
raise ValueError('Percentages must add up to 100')
proportions = [total * (pct / 100) for pct in pcts]
apportions = [math.floor(p) for p in proportions]
remainder = total - sum(apportions)
remainders = [(i, p - math.floor(p)) for (i, p) in enumerate(proportions)]
remainders.sort(key=operator.itemgetter(1), reverse=True)
for (i, _) in itertools.cycle(remainders):
if remainder == 0:
break
else:
apportions[i] += 1
remainder -= 1
return apportions


images_df = dd.read_csv('./tests/data/classification/images.csv', blocksize=1024)

output_ratio = [80, 20]


def partition_class(group_df, ratio):
proportions = apportion_pcts(ratio, len(group_df))
partitions = []
start = 0
for proportion in proportions:
s = slice(start, start + proportion)
partitions.append(group_df.iloc[s, :])
start += proportion
return pd.Series(partitions)


partitioned_schema = dd.utils.make_meta(
[(i, object) for i in range(len(output_ratio))],
pd.Index([], name='image_class_id'))

partitioned_df = images_df.groupby('image_class_id')
partitioned_df = partitioned_df.apply(
partition_class, meta=partitioned_schema, ratio=output_ratio)


def zip_partitions(partitions_df):
partitions = []
for i in partitions_df.columns:
partitions.append(pd.concat(partitions_df[i].tolist()))
return pd.Series(partitions)


zipped_schema = dd.utils.make_meta((None, object))

partitioned_ds = partitioned_df.reduction(
chunk=zip_partitions, meta=zipped_schema)

我认为应该可以将缩减结合起来并应用于单个自定义聚合来表示 map 缩减操作。

但是我无法弄清楚如何使用自定义聚合来完成这样的事情,因为它使用了系列 groupby。

Visualized Graph

关于python - Dask 连接一系列数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58498507/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com