gpt4 book ai didi

c# - 以有限订阅者同时订阅可观察集合的简单方法

转载 作者:太空宇宙 更新时间:2023-11-03 18:27:14 24 4
gpt4 key购买 nike

我一直在尝试使用 Rx 和可观察集合来实现一个简单的生产者-消费者模式。我还需要能够轻松地限制订阅者的数量。我在并行扩展中看到了很多对 LimitedConcurrencyLevelTask​​Scheduler 的引用,但我似乎无法让它使用多线程。

我认为我在做一些愚蠢的事情,所以我希望有人能解释一下。在下面的单元测试中,我希望使用多个 (2) 线程来使用阻塞集合中的字符串。我做错了什么?

[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();

[TestMethod]
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{

// Setup the command queue for processing combinations
var commandQueue = new BlockingCollection<string>();

var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);

commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });

var iterationCount = 100;
for (int i = 0; i < iterationCount; i++)
{
commandQueue.Add(string.Format("string {0}", i));
}
commandQueue.CompleteAdding();

while (!commandQueue.IsCompleted)
{
Thread.Sleep(100);
}

Assert.AreEqual(iterationCount, _testStrings.Count);
Assert.AreEqual(2, _threadIds.Distinct().Count());
}

private void Go(string testString)
{
_testStrings.Add(testString);
_threadIds.Add(Thread.CurrentThread.ManagedThreadId);
}
}

最佳答案

似乎每个人都经历了与 Rx 相同的学习曲线。需要理解的是,Rx 不会进行并行处理,除非您明确地进行强制并行的查询。调度程序引入并行性。

Rx 有一个行为契约,它说零个或多个值是连续产生的(不管可能使用多少线程),一个接一个,没有重叠,最后跟一个可选的单个错误或单个完整的消息,然后没有别的。

这通常写成 OnNext*(OnError|OnCompleted)

调度器所做的只是定义规则,以确定在哪个线程上处理新值如果调度器没有正在为当前可观察值处理的待处理值

现在获取您的代码:

var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);

这表示调度程序将在两个线程之一上运行订阅值。但这并不意味着它会对产生的每个值(value)都这样做。请记住,由于值是连续产生的,一个接一个,因此最好重新使用现有线程,而不是去创建一个新线程的高成本。因此,Rx 所做的是重用现有线程如果在当前值处理完成之前在调度程序上调度了一个新值。

这是关键 - 如果在现有值的处理完成之前安排了新值,它会重新使用线程。

所以你的代码是这样做的:

commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });

这意味着调度程序只会在第一个值出现时创建一个线程。但是当昂贵的线程创建操作完成时,将值添加到 commandQueue 的代码也已完成,因此它已将它们全部排队,因此它可以更有效地使用单个线程而不是创建一个线程昂贵的第二个。

为避免这种情况,您需要构造查询以引入并行性。

方法如下:

public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);

var iterationCount = 100;

Observable
.Range(0, iterationCount)
.SelectMany(n => Observable.Start(() => n.ToString(), scheduler)
.Do(x => Go(x)))
.Wait();

(iterationCount == _testStrings.Count).Dump();
(2 == _threadIds.Distinct().Count()).Dump();
}

现在,我使用了 Do(...)/.Wait() 组合来为您提供相当于阻塞 .Subscribe( ...) 方法。

这个结果是你的断言都返回 true。

关于c# - 以有限订阅者同时订阅可观察集合的简单方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31686336/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com