- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 IAsyncEnumerable,它返回本质上是 Key/IEnumerable<Value>
的序列。对。我有代码使用这个和其他类似的枚举,假设它将接收一个唯一的键集合。但是我的一个数据源不遵守此约束。但是,它确实会将重复的键组合在一起。 (您不会看到 [ k1
, k2
, k1
]。)
这应该很容易用一个包装器来解决,该包装器按键对数据进行分区并连接值,除了我在 System.Linq.Async
中没有看到任何可用的分区运算符。 .有GroupBy
和 ToLookup
,但这两个都是急切的运算符,它们会立即消耗整个序列。由于涉及大量数据,这不适合我的目的。
有什么简单的方法可以对 IAsyncEnumerable
进行分区吗?类似于 GroupBy
,根据键选择器对输入进行分组,但保持其行为完全惰性并在键更改时按需生成新分组?
编辑: 我查看了 MoreLINQ 是否有类似的内容,并找到了 GroupAdjacent
, 但是 the code shows也就是说,虽然它不会急切地消耗整个输入序列,但在开始一个新组时它仍然会急切地消耗整个组。我正在寻找一种将在其分组中返回惰性可枚举的方法。这比听起来更棘手!
最佳答案
这是一个GroupAdjacent
异步序列的运算符,类似于 synonymous operator的 MoreLinq包,不同之处在于它不缓冲发出的分组的元素。分组应该以正确的顺序完全枚举,一次一个分组,否则 InvalidOperationException
将被抛出。
此实现需要包 System.Linq.Async ,因为它发出实现 IAsyncGrouping<out TKey, out TElement>
的分组界面。
/// <summary>
/// Groups the adjacent elements of a sequence according to a specified
/// key selector function.
/// </summary>
/// <remarks>
/// The groups don't contain buffered elements.
/// Enumerating the groups in the correct order is mandatory.
/// </remarks>
public static IAsyncEnumerable<IAsyncGrouping<TKey, TSource>>
GroupAdjacent<TSource, TKey>(
this IAsyncEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(keySelector);
keyComparer ??= EqualityComparer<TKey>.Default;
return Implementation();
async IAsyncEnumerable<IAsyncGrouping<TKey, TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Tuple<TSource, TKey, bool> sharedState = null;
var enumerator = source.GetAsyncEnumerator(cancellationToken);
try
{
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
yield break;
var firstItem = enumerator.Current;
var firstKey = keySelector(firstItem);
sharedState = new(firstItem, firstKey, true);
Tuple<TSource, TKey, bool> previousState = null;
while (true)
{
var state = Volatile.Read(ref sharedState);
if (ReferenceEquals(state, previousState))
throw new InvalidOperationException("Out of order enumeration.");
var (item, key, exists) = state;
if (!exists) yield break;
previousState = state;
yield return new AsyncGrouping<TKey, TSource>(key, GetAdjacent(state));
}
}
finally { await enumerator.DisposeAsync().ConfigureAwait(false); }
async IAsyncEnumerable<TSource> GetAdjacent(Tuple<TSource, TKey, bool> state)
{
if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
throw new InvalidOperationException("Out of order enumeration.");
var (stateItem, stateKey, stateExists) = state;
Debug.Assert(stateExists);
yield return stateItem;
Tuple<TSource, TKey, bool> nextState;
while (true)
{
if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
throw new InvalidOperationException("Out of order enumeration.");
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
nextState = new(default, default, false);
break;
}
var item = enumerator.Current;
var key = keySelector(item);
if (!keyComparer.Equals(key, stateKey))
{
nextState = new(item, key, true);
break;
}
yield return item;
}
if (!ReferenceEquals(Interlocked.CompareExchange(
ref sharedState, nextState, state), state))
throw new InvalidOperationException("Out of order enumeration.");
}
}
}
private class AsyncGrouping<TKey, TElement> : IAsyncGrouping<TKey, TElement>
{
private readonly TKey _key;
private readonly IAsyncEnumerable<TElement> _sequence;
public AsyncGrouping(TKey key, IAsyncEnumerable<TElement> sequence)
{
_key = key;
_sequence = sequence;
}
public TKey Key => _key;
public IAsyncEnumerator<TElement> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
return _sequence.GetAsyncEnumerator(cancellationToken);
}
}
使用示例:
IAsyncEnumerable<IGrouping<string, double>> source = //...
IAsyncEnumerable<IAsyncGrouping<string, double>> merged = source
.GroupAdjacent(g => g.Key)
.Select(gg => new AsyncGrouping<string, double>(
gg.Key, gg.Select(g => g.ToAsyncEnumerable()).Concat()));
此示例以包含分组的序列开始,目标是将任何具有相同键的相邻分组组合成包含所有元素的单个异步分组。申请 GroupAdjacent(g => g.Key)
后运算符我们得到这种类型:
IAsyncEnumerable<IAsyncGrouping<string, IGrouping<string, double>>>
所以在这个阶段,每个异步分组都包含内部分组,而不是单个元素。我们需要 Concat
这样的嵌套结构才能得到我们想要的。 Concat
运算符存在于 System.Interactive.Async 中包,它有这个签名:
public static IAsyncEnumerable<TSource> Concat<TSource>(
this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources);
ToAsyncEnumerable
运算符 (System.Linq.Async) 附加到同步内部分组,以满足此签名。
关于c# - 如何对 IAsyncEnumerable 进行惰性分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72339086/
我有以下界面: public interface IValidationSystem { IAsyncEnumerable ValidateAsync(T obj); } 我正在尝试以这种方式
我想知道是否有办法创建 IAsyncEnumerable或 IAsyncEnumerator通过 Source 对象,而不是像 TaskCompletionSource允许一个人为任务做。特别是Tas
正如标题所说,我必须执行以下功能: public async IAsyncEnumerable GetByPipeline(int pipelineId, [EnumeratorCancell
C# 8 添加了对异步迭代器 block 的支持,因此您可以等待并返回 IAsyncEnumarator 而不是 IEnumerable: public async IAsyncEnumerable
我正在尝试使用基于 IAsyncEnumerable 的 API 包装基于事件的异步订阅 API。 .基本上沿着以下路线: async IAsyncEnumerable ReadAll() {
我们有这样的代码: var intList = new List{1,2,3}; var asyncEnumerables = intList.Select(Foo); private async I
我有两种方法连接到 Foo 的两个不同来源s 返回两个 IAsyncEnumerable .我需要获取所有 Foo s 来自两个来源,然后才能处理它们。 问题 :我想同时(异步)查询两个源,即。不等S
我想知道是否有一种方法可以编写一个函数来“传递”一个 IAsyncEnumerable ......也就是说,该函数将调用另一个 IAsyncEnumerable 函数并产生所有结果而无需编写 for
我正在构建一个客户端,用于持续使用数据源中的新记录。集成是基于拉取的,我的客户定期查询数据源以获取新记录。我使用 IAsyncEnumerable 作为这个连续的新记录流的返回类型。以下是该方法的要点
我有一个这样写的界面: public interface IItemRetriever { public IAsyncEnumerable GetItemsAsync(); } 我想编写一个不
这个问题的答案可能是不可能,但问题是:假设您有一个 C# 方法来使用 TextReader 中的行返回 IAsyncEnumerable .你如何确保什么时候DisposeAsync在 IAsyncE
这个问题的答案可能是不可能,但问题是:假设您有一个 C# 方法来使用 TextReader 中的行返回 IAsyncEnumerable .你如何确保什么时候DisposeAsync在 IAsyncE
我正在使用 EF Core 3.1 和 Postgresql 对数据库运行一个查询,该查询有时会根据请求的参数返回较大的结果集。我使用 Async Enumerable 来流式传输结果,有时可能需要一
我想对一个方法进行单元测试,该方法调用返回 IAsyncEnumerable 的服务的另一个方法.我创建了一个模拟服务 Mock我想设置这个模拟,但我不知道该怎么做。是否可以 ?是否有其他单元测试方法
我有一个 IAsyncEnumerable,它返回本质上是 Key/IEnumerable 的序列。对。我有代码使用这个和其他类似的枚举,假设它将接收一个唯一的键集合。但是我的一个数据源不遵守此约束。
这个问题在这里已经有了答案: How to implement an efficient WhenEach that streams an IAsyncEnumerable of task resu
我有一种情况需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,它应该以并行方式执行。 我已经使用 AsyncAwaitBestPractices 编写了这样的代码来实现这个目标,
我有一个简单的场景,我有一个具有以下方法的类: public async IAsyncEnumerable GetEntities(IQueryOptions options){ if(!vali
我正在尝试检索使用 C# 8.0 中新的 AsyncEnumerables 分页的数据。回到同步 IEnumerable 世界,代码看起来像这样: private IEnumerable Exampl
运行 IAsyncEnumerable 的枚举两次不可能? 一次CountAsync已运行,await foreach不会枚举任何项目。为什么?AsyncEnumerator 上似乎没有重置方法. v
我是一名优秀的程序员,十分优秀!