gpt4 book ai didi

c# - SelectMany 带有等待和结果排序

转载 作者:行者123 更新时间:2023-12-02 09:16:35 26 4
gpt4 key购买 nike

文章中Deep Dive into Rx SelectMany作者在最后的注释中提到了以下内容;

Note: To mitigate the ordering issue, SelectMany() comes with an overload which takes a selector with signature Func<TSource, int,
Task<TResult>>
.

有人能告诉我这是如何工作的吗?

最佳答案

@supertopi 的答案是正确的。

为了好玩,我想我应该创建一个扩展方法 ToOrdinalOrder 来强制输出顺序,然后可以将其与 SelectMany 拼接在一起以获得 的版本SelectMany,其中输出顺序与输入顺序匹配。

public static class Extensions
{
public static IObservable<T> ToOrdinalOrder<T>(this IObservable<T> source, Func<T, int> indexSelector)
{
return source
.Scan((expectedIndex: 0, heldItems: ImmutableDictionary<int, T>.Empty, toReturn: Observable.Empty<T>()), (state, item) =>
{
var itemIndex = indexSelector(item);
if (itemIndex == state.expectedIndex)
{
var toReturn = Observable.Return(item);
var expectedIndex = itemIndex + 1;
var heldItems = state.heldItems;
while (heldItems.ContainsKey(expectedIndex))
{
toReturn = toReturn.Concat(Observable.Return(heldItems[expectedIndex]));
heldItems = heldItems.Remove(expectedIndex);
expectedIndex++;
}
return (expectedIndex, heldItems, toReturn);
}
else
return (state.expectedIndex, state.heldItems.Add(itemIndex, item), Observable.Empty<T>());
})
.SelectMany(t => t.toReturn);
}

public static IObservable<TResult> OrderedSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
{
return source
.SelectMany(async (TSource item, int index) => (await selector(item), index))
.ToOrdinalOrder(t => t.Item2)
.Select(t => t.Item1);
}
}

这里有一些示例代码来演示用法:

var source = new Subject<string>();

source
.Select((item, index) => (item, index)) //index only required for logging purposes.
.OrderedSelectMany(async t =>
{
var item = t.Item1;
var index = t.Item2;
Console.WriteLine($"Starting item {item}, index {index}.");
switch (index)
{
case 0:
await Task.Delay(TimeSpan.FromMilliseconds(50));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 1:
await Task.Delay(TimeSpan.FromMilliseconds(200));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
case 2:
await Task.Delay(TimeSpan.FromMilliseconds(20));
Console.WriteLine($"Completing item {item}, index {index}.");
return (item, index);
default:
throw new NotImplementedException();
}
})
.Subscribe(s => Console.WriteLine($"Received item '{s}'."));

source.OnNext("A");
source.OnNext("B");
source.OnNext("C");
source.OnCompleted();

输出如下:

Starting item A, index 0.
Starting item B, index 1.
Starting item C, index 2.
Completing item C, index 2.
Completing item A, index 0.
Received item '(A, 0)'.
Completing item B, index 1.
Received item '(B, 1)'.
Received item '(C, 2)'.

关于c# - SelectMany 带有等待和结果排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44677052/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com