gpt4 book ai didi

c# - 如何对 IAsyncEnumerable 进行惰性分区?

转载 作者:行者123 更新时间:2023-12-05 05:40:49 24 4
gpt4 key购买 nike

我有一个 IAsyncEnumerable,它返回本质上是 Key/IEnumerable<Value> 的序列。对。我有代码使用这个和其他类似的枚举,假设它将接收一个唯一的键集合。但是我的一个数据源不遵守此约束。但是,它确实会将重复的键组合在一起。 (您不会看到 [ k1 , k2 , k1 ]。)

这应该很容易用一个包装器来解决,该包装器按键对数据进行分区并连接值,除了我在 System.Linq.Async 中没有看到任何可用的分区运算符。 .有GroupByToLookup ,但这两个都是急切的运算符,它们会立即消耗整个序列。由于涉及大量数据,这不适合我的目的。

有什么简单的方法可以对 IAsyncEnumerable 进行分区吗?类似于 GroupBy ,根据键选择器对输入进行分组,但保持其行为完全惰性并在键更改时按需生成新分组?

编辑: 我查看了 MoreLINQ 是否有类似的内容,并找到了 GroupAdjacent , 但是 the code shows也就是说,虽然它不会急切地消耗整个输入序列,但在开始一个新组时它仍然会急切地消耗整个组。我正在寻找一种将在其分组中返回惰性可枚举的方法。这比听起来更棘手!

最佳答案

这是一个GroupAdjacent异步序列的运算符,类似于 synonymous operatorMoreLinq包,不同之处在于它不缓冲发出的分组的元素。分组应该以正确的顺序完全枚举,一次一个分组,否则 InvalidOperationException将被抛出。

此实现需要包 System.Linq.Async ,因为它发出实现 IAsyncGrouping<out TKey, out TElement> 的分组界面。

/// <summary>
/// Groups the adjacent elements of a sequence according to a specified
/// key selector function.
/// </summary>
/// <remarks>
/// The groups don't contain buffered elements.
/// Enumerating the groups in the correct order is mandatory.
/// </remarks>
public static IAsyncEnumerable<IAsyncGrouping<TKey, TSource>>
GroupAdjacent<TSource, TKey>(
this IAsyncEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(keySelector);
keyComparer ??= EqualityComparer<TKey>.Default;
return Implementation();

async IAsyncEnumerable<IAsyncGrouping<TKey, TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Tuple<TSource, TKey, bool> sharedState = null;
var enumerator = source.GetAsyncEnumerator(cancellationToken);
try
{
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
yield break;
var firstItem = enumerator.Current;
var firstKey = keySelector(firstItem);
sharedState = new(firstItem, firstKey, true);

Tuple<TSource, TKey, bool> previousState = null;
while (true)
{
var state = Volatile.Read(ref sharedState);
if (ReferenceEquals(state, previousState))
throw new InvalidOperationException("Out of order enumeration.");
var (item, key, exists) = state;
if (!exists) yield break;
previousState = state;
yield return new AsyncGrouping<TKey, TSource>(key, GetAdjacent(state));
}
}
finally { await enumerator.DisposeAsync().ConfigureAwait(false); }

async IAsyncEnumerable<TSource> GetAdjacent(Tuple<TSource, TKey, bool> state)
{
if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
throw new InvalidOperationException("Out of order enumeration.");
var (stateItem, stateKey, stateExists) = state;
Debug.Assert(stateExists);
yield return stateItem;
Tuple<TSource, TKey, bool> nextState;
while (true)
{
if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
throw new InvalidOperationException("Out of order enumeration.");
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
nextState = new(default, default, false);
break;
}
var item = enumerator.Current;
var key = keySelector(item);
if (!keyComparer.Equals(key, stateKey))
{
nextState = new(item, key, true);
break;
}
yield return item;
}
if (!ReferenceEquals(Interlocked.CompareExchange(
ref sharedState, nextState, state), state))
throw new InvalidOperationException("Out of order enumeration.");
}
}
}

private class AsyncGrouping<TKey, TElement> : IAsyncGrouping<TKey, TElement>
{
private readonly TKey _key;
private readonly IAsyncEnumerable<TElement> _sequence;

public AsyncGrouping(TKey key, IAsyncEnumerable<TElement> sequence)
{
_key = key;
_sequence = sequence;
}

public TKey Key => _key;

public IAsyncEnumerator<TElement> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
return _sequence.GetAsyncEnumerator(cancellationToken);
}
}

使用示例:

IAsyncEnumerable<IGrouping<string, double>> source = //...

IAsyncEnumerable<IAsyncGrouping<string, double>> merged = source
.GroupAdjacent(g => g.Key)
.Select(gg => new AsyncGrouping<string, double>(
gg.Key, gg.Select(g => g.ToAsyncEnumerable()).Concat()));

此示例以包含分组的序列开始,目标是将任何具有相同键的相邻分组组合成包含所有元素的单个异步分组。申请 GroupAdjacent(g => g.Key) 后运算符我们得到这种类型:

IAsyncEnumerable<IAsyncGrouping<string, IGrouping<string, double>>>

所以在这个阶段,每个异步分组都包含内部分组,而不是单个元素。我们需要 Concat这样的嵌套结构才能得到我们想要的。 Concat运算符存在于 System.Interactive.Async 中包,它有这个签名:

public static IAsyncEnumerable<TSource> Concat<TSource>(
this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources);

ToAsyncEnumerable运算符 (System.Linq.Async) 附加到同步内部分组,以满足此签名。

关于c# - 如何对 IAsyncEnumerable 进行惰性分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72339086/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com