How do I GroupShuffleSplit a parquet dataframe lazily?(如何将拼接数据帧懒洋洋地拆分?)

I have a parquet dataset that looks like this (I'm using polars, but any dataframe library is fine):


df = pl.DataFrame(
"match_id": [
1, 1, 1,
2, 2, 2, 2,
3, 3, 3, 3,
"team_id": [
1, 2, 2,
1, 1, 2, 2,
1, 2, 2, 3,
"player_name": [
"kevin", "james", "kelly",
"john", "jenny", "jim", "josh",
"alice", "kevin", "lilly", "erica",

I would like to group by match_id and test train split such that 80% of matches are in training set, and the rest in test set. So something like this:


group_df = df.group_by(["match_id"])
train, test = group_split(group_df, test_size=0.20)

I need a python solution, preferably with dask, pandas or another dataframe library. Currently pandas doesn't support lazy evaluation, as the dataset is quite large. So it seems out of the question to use pandas. Dask on the other hand doesn't support any of the sklearn.model_selection splitters since it doesn't have integer based indexing support.


Ideally a simple GroupShuffleSplit working with dask is all I need. Is there any other library that supports this? If so, how do I do this with parquet in a lazy way?



Could you clarify what sort of output you want? GroupShuffleSplit returns indices - would you be happy with the actual rows?


What is the group you want to shuffle over? Please help us a bit by constructing a sample input dataframe with at least a few groups, and ideally also an output dataframe explaining what it could look like. It’s great if you add this as pl.DataFrame(…). If you have a dataframe already, you can generate the dict to pass to this by df.head().to_dict(as_series=False). Make sure you have more than one group though.


@TomNorway Added an example dataframe, I assume in Polars it would be a LazyFrame with streaming enabled. And yes, rows are fine as long as it works on low ram machines.



Mayby some this like this will work for you.


However, it is not a perfect answer, it is trying to tackle problem of a big size of data.


In this solution, GroupShuffleSplit works for each partiotion of a data but not a hole dataset and due to match_id.unique is used resulting train/test could be not 20/80 size at all.



import dask.dataframe as dd
import numpy as np
from sklearn.model_selection import GroupShuffleSplit

train = []
test = []
gss = GroupShuffleSplit(n_splits=1, test_size=0.20, random_state=42) # Adjust random_state as needed

for i in range(df.npartitions):
part = df.partitions[i]
groups = part.match_id.unique().compute()
train_groups, test_groups = next(gss.split(groups, groups=groups))
train += [part[part.match_id.isin(train_groups)]]
test += [part[part.match_id.isin(test_groups)]]

# now in test array you will have list of dask dataframes
# to fetch data from them just concat and compute

dd.concat(test).shape[0].compute() # will give in my case 282_111_648

Solution tested with this data

import polars as pl
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
# '0.19.2'
n_rows = 10**6 # 1_000_000 rows
df = pl.DataFrame([
pl.Series('match_id', np.random.choice(range(10**3), size=n_rows)), # 1_000 matches
pl.Series('team_id', np.random.choice(range(10**2), size=n_rows)), # 100 teams
pl.Series('player_name', np.random.choice([
"kevin", "james", "kelly",
"john", "jenny", "jim", "josh",
"alice", "kevin", "lilly", "erica",
], size=n_rows))
df = pl.concat([df]*1_0000) # 1_000_000_000 rows
df.collect(streaming=True).write_parquet('test.parquet') # ~5GB

import dask.dataframe as dd
import dask.array as da
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
ddf = dd.read_parquet('your_dataset.parquet')
def dask_group_shuffle_split(df, groups, test_size=0.2, random_state=None):
# Get the unique group values
unique_groups = df[groups].drop_duplicates()

# Create an array of unique group indices
group_indices = da.from_array(unique_groups.to_dask_array(), chunks=1)

# Perform the GroupShuffleSplit
splitter = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
train_idx, test_idx = next(splitter.split(df, groups=df[groups]))

# Map the group indices to corresponding rows
train_groups = group_indices[train_idx].compute()
test_groups = group_indices[test_idx].compute()

# Filter the original DataFrame based on the selected indices
train = df[df.index.isin(train_idx)]
test = df[df.index.isin(test_idx)]

return train, test, train_groups, test_groups
train, test, train_groups, test_groups = dask_group_shuffle_split(ddf, groups='match_id', test_size=0.2, random_state=42)

Let's agree that if your primary objective is to split your data while preserving groups, and you want to work with a lazy computation engine, Dask is indeed a good choice.


import dask.dataframe as dd
import numpy as np

# For this example, I used the 'from_pandas' method (for my environment)
# In your actual use-case, you need to use dd.read_parquet() method.

pdf = pd.DataFrame(
"match_id": [
1, 1, 1,
2, 2, 2, 2,
3, 3, 3, 3,
"team_id": [
1, 2, 2,
1, 1, 2, 2,
1, 2, 2, 3,
"player_name": [
"kevin", "james", "kelly",
"john", "jenny", "jim", "josh",
"alice", "kevin", "lilly", "erica",

ddf = dd.from_pandas(pdf, npartitions=2)

# Here you need the unique match_ids
unique_matches = ddf['match_id'].unique().compute()

# Here you need to shuffle the unique matches
shuffled_matches = np.random.permutation(unique_matches)

# Here you need to split indices for train-test
split_idx = int(0.8 * len(shuffled_matches))

train_matches = shuffled_matches[:split_idx]
test_matches = shuffled_matches[split_idx:]

# Then filter out the records based on match_id
train_df = ddf[ddf['match_id'].isin(train_matches)]
test_df = ddf[ddf['match_id'].isin(test_matches)]

# The above operation is still lazy. You can compute to get the actual dataframe.
train_df_computed = train_df.compute()
test_df_computed = test_df.compute()


My approach is manual and not as elegant as using GroupShuffleSplit, but it serves the purpose but if your data is already in a Parquet file, you can use dd.read_parquet() to read it directly into a Dask DataFrame.
For the .compute() method it will trigger the actual computation in Dask and before that, everything is just a lazy operation.


If you want to use the GroupBy approach only, here's a way you can do that.


def group_split(grouped_data, test_size=0.2):
ngroups = grouped_data.ngroups
train_size = ngroups - math.ceil(ngroups * test_size)

group_names = list(grouped_data.groups.keys())
train_data = pd.concat((grouped_data.get_group(group_id) for group_id in group_names[:train_size]), ignore_index=True)
test_data = pd.concat((grouped_data.get_group(group_id) for group_id in group_names[train_size:]), ignore_index=True)
return train_data, test_data

Sample output of the same is


group_split(grouped, 0.2)

( match_id team_id player_name
0 1 1 kevin
1 1 2 james
2 1 2 kelly
3 2 1 john
4 2 1 jenny
5 2 2 jim
6 2 2 josh,
match_id team_id player_name
0 3 1 alice
1 3 2 kevin
2 3 2 lilly
3 3 3 erica)

You can add shuffling and other things as well, by using group_names variable, not included here for brevity.


You can use dask.dataframe to perform the grouping and splitting operation on your Parquet dataset. Here's an example code snippet that should accomplish what you described:


import dask.dataframe as dd
from dask.model_selection import GroupShuffleSplit
# Load your Parquet file into a dask dataframe
dd_df = dd.read_parquet('path/to/your/file.parquet')
# Group by match_id and split into train and test sets
groups = dd_df.groupby('match_id')
train, test = groups.random_state(42).split(test_size=0.2, random_state=42)
# The resulting train and test dataframes will be dask dataframes

This code uses dask.dataframe to load your Parquet file and perform the grouping and splitting operation. The GroupShuffleSplit class from dask.model_selection is used to split the data into training and testing sets. The random_state parameter is set to 42 to ensure reproducibility.
The resulting train and test dataframes will be dask dataframes, which you can then use for further processing or modeling tasks. Note that since you're working with a large dataset, it's advisable to work with dask dataframes instead of pandas dataframes to avoid memory constraints.

此代码使用dask.dataframe加载您的Parquet文件并执行分组和拆分操作。Dask.Model_select中的GroupShuffleSplit类用于将数据拆分成训练集和测试集。将RANDOM_STATE参数设置为42以确保重现性。生成的训练和测试数据帧将是DASK数据帧,然后您可以将其用于进一步的处理或建模任务。请注意,由于您使用的是大型数据集,建议使用DaskDataFrame而不是Pandas DataFrame,以避免内存限制。


"out of the question to use pandas" from OP


