gpt4 book ai didi

pyarrow - 在 pyarrow 表中删除重复项?

转载 作者:行者123 更新时间:2023-12-05 02:02:16 44 4
gpt4 key购买 nike

有没有一种方法可以使用纯 pyarrow 表对数据进行排序并删除重复项?我的目标是根据最大更新时间戳检索每个 ID 的最新版本。

一些额外的细节:我的数据集通常至少分为两个版本:

  • 历史
  • 最终

历史数据集将包括来自源的所有更新项目,因此对于发生在它身上的每个更改,单个 ID 可能有重复项(例如,想象一张 Zendesk 或 ServiceNow 票证,其中一张票证可以多次更新次)

然后我使用过滤器读取历史数据集,将其转换为 pandas DF,对数据进行排序,然后在某些唯一约束列上删除重复项。

dataset = ds.dataset(history, filesystem, partitioning)
table = dataset.to_table(filter=filter_expression, columns=columns)
df = table.to_pandas().sort_values(sort_columns, ascending=True).drop_duplicates(unique_constraint, keep="last")
table = pa.Table.from_pandas(df=df, schema=table.schema, preserve_index=False)

# ds.write_dataset(final, filesystem, partitioning)

# I tend to write the final dataset using the legacy dataset so I can make use of the partition_filename_cb - that way I can have one file per date_id. Our visualization tool connects to these files directly
# container/dataset/date_id=20210127/20210127.parquet

pq.write_to_dataset(final, filesystem, partition_cols=["date_id"], use_legacy_dataset=True, partition_filename_cb=lambda x: str(x[-1]).split(".")[0] + ".parquet")

如果可能的话,最好不要再转换为 pandas 然后再转换回表格。

最佳答案

编辑 2022 年 3 月:PyArrow 正在添加更多功能,尽管目前还没有这个功能。我现在的做法是:

def drop_duplicates(table: pa.Table, column_name: str) -> pa.Table:
unique_values = pc.unique(table[column_name])
unique_indices = [pc.index(table[column_name], value).as_py() for value in unique_values]
mask = np.full((len(table)), False)
mask[unique_indices] = True
return table.filter(mask=mask)

//结束编辑

我看到了你的问题,因为我有一个类似的问题,我在工作中解决了它(由于 IP 问题,我不能发布整个代码,但我会尽力回答。我已经以前从未这样做过)

import pyarrow.compute as pc
import pyarrow as pa
import numpy as np

array = table.column(column_name)
dicts = {dct['values']: dct['counts'] for dct in pc.value_counts(array).to_pylist()}
for key, value in dicts.items():
# do stuff

我使用“value_counts”来查找唯一值及其数量(https://arrow.apache.org/docs/python/generated/pyarrow.compute.value_counts.html)。然后我迭代了这些值。如果值为 1,我使用

选择了该行
mask = pa.array(np.array(array) == key)
row = table.filter(mask)

如果计数大于 1,我会再次使用 numpy bool 数组作为掩码来选择第一个或最后一个。

迭代后就和pa.concat_tables(tables)一样简单了

警告:这是一个缓慢的过程。如果您需要一些快速而肮脏的东西,请尝试“独特”选项(也在我提供的同一链接中)。

编辑/额外::您可以通过在遍历字典时保持 bool 掩码的 numpy 数组来使其更快/占用更少的内存。然后最后你返回一个“table.filter(mask=boolean_mask)”。我不知道如何计算速度...

编辑2:(抱歉进行了很多编辑。我一直在进行大量重构,并试图让它更快地工作。)

你也可以试试这样的:

def drop_duplicates(table: pa.Table, col_name: str) ->pa.Table:
column_array = table.column(col_name)
mask_x = np.full((table.shape[0]), False)
_, mask_indices = np.unique(np.array(column_array), return_index=True)
mask_x[mask_indices] = True
return table.filter(mask=mask_x)

关于pyarrow - 在 pyarrow 表中删除重复项?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65937043/

44 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com