gpt4 book ai didi

c# - 限制 Observable GroupBy/Merge 组合的并发

转载 作者:行者123 更新时间:2023-11-30 17:25:43 24 4
gpt4 key购买 nike

我们正在使用 C# 和 Reactive Extensions 实现一些软件组件。它包含使用 GroupBy 方法拆分 observable 的功能,然后对这些拆分的 observable 执行一些算术运算,然后使用 Merge() 方法将 observable 合并回一起。

如果不使用 maxConcurrent 参数,一切顺利。因为如果使用此参数,数据似乎“丢失”了。

尝试搜索此问题。尝试合并 Observable.StartObservable.Defer 但没有结果。创建了一个真正的小型测试应用程序来展示问题。

var sourceObservable = Enumerable.Range(0, 10)
.Select(x => new { Index = x, Remainder = x % 3 }).ToObservable();

var ungrouped = sourceObservable.Select(x => x.Index);
var limitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge(maxConcurrent: 2);
var unlimitedGrouping = sourceObservable.GroupBy(x => x.Remainder)
.Select(group => group.Select(x => x.Index)).Merge();

Console.WriteLine($"ungrouped: {string.Join(",", await ungrouped.ToList())}");
Console.WriteLine($"limited: {string.Join(",", await limitedGrouping.ToList())}");
Console.WriteLine($"unlimited: {string.Join(",", await unlimitedGrouping.ToList())}");

预计在这种情况下,“limitedGrouping”内容将与“unlimitedGrouping”内容相同。然而它不是:

未分组:0,1,2,3,4,5,6,7,8,9

有限:0,1,3,4,6,7,9

无限制:0,1,2,3,4,5,6,7,8,9

有限的一个缺少数据编号 2、5 和 8。我们在这里犯了什么错误?

最佳答案

它看起来像是 GroupBy 中预期但令人困惑的功能。此代码是等效的,但同样会失败:

var source = Observable.Range(0, 10);
source
.GroupBy(i => i % 3)
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9

这段代码很相似,但是成功了:

var a = source.Where(i => i % 3 == 0);
var b = source.Where(i => i % 3 == 1);
var c = source.Where(i => i % 3 == 2);
var l = new List<IObservable<int>>() { a, b, c };
l.ToObservable()
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 9 2 5 8

有点迷幻的是:

source
.GroupBy(i => i % 3)
.Concat() //or .Merge(1), those are roughly equivalent.
.Subscribe(Console.WriteLine); //Outputs 0 3 6 9

当我第一次看到它时,我预计所有 Merge(2) 案例都是 0 1 3 4 6 7 9 2 5 8。我预计 Concat,基本上是 Merge(1)0 3 6 9 1 4 7 2 5 8

maxConcurrent(n) 表示一次只能订阅 n 个可观察对象。如果它接收到超过 n 个 observables,那么它会将额外的 observables 排队,在旧的 observables 结束时稍后订阅。

在我们的例子中,它按顺序接收三个可观察值(mod-0、mod-1 和 mod-2)。它订阅前两个,然后将 mod-2 observable 排队,仅在 mod-0 或 mod-1 完成时订阅。然而,当 mod-0/mod-1 observable 完成时,mod-2 observable 显然也完成了,所以没有收到通知。

当我第一次看到这个时,我认为这是一个错误,因为我认为 GroupBy 的子可观察对象应该是冷的。但看起来它们总体上是温暖的,如果这有任何意义的话:订阅其中一个 child ,其他 child 就会变得炙手可热。这在 GroupBy 可以用作冷或热可观察对象的运算符的上下文中是有意义的,并且没有内置重放功能。

如果你想看到这个演示,考虑这个:

source
.GroupBy(i => i % 3)
.Select(o => o.Take(3))
.Merge(2)
.Subscribe(Console.WriteLine); //Outputs 0 1 3 4 6 7 8

这里 mod-0 observable 从 6 之后取消订阅,第三个 mod-0 数字。 Merge 然后订阅热的 mod-2 observable,输出最后一个 mod-2 数字 8。

希望对您有所帮助。如果您不熟悉可观察温度的 System.Reactive 概念,我推荐 this article .

关于c# - 限制 Observable GroupBy/Merge 组合的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57788750/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com