gpt4 book ai didi

system.reactive - Rx 队列实现和 Dispatcher 的缓冲区

转载 作者:行者123 更新时间:2023-12-04 07:19:51 25 4
gpt4 key购买 nike

我想实现一个队列,它能够在多个线程中从多个生产者那里获取事件/项目,并在单个线程上使用它们。这个队列会在一些关键环境中工作,所以我很关心它的稳定性。

我已经使用 Rx 功能实现了它,但我有 2 个问题:

  1. 这个实现是否可行?或者它在某种我不知道的方面存在缺陷? (作为替代方案 - 使用队列和锁手动实现)
  2. Dispatcher 的缓冲区长度是多少?它可以处理 10 万个排队的项目吗?

下面的代码使用简单的 TestMethod 说明了我的方法。它的输出显示所有值都是从不同线程输入的,但在另一个线程上处理。

[TestMethod()]
public void RxTest()
{
Subject<string> queue = new Subject<string>();

queue
.ObserveOnDispatcher()
.Subscribe(s =>
{
Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
},
() => Dispatcher.CurrentDispatcher.InvokeShutdown());

for (int j = 0; j < 10; j++)
{
ThreadPool.QueueUserWorkItem(o =>
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(10);
queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
}
queue.OnCompleted();
});
}


Dispatcher.Run();
}

最佳答案

我不确定 Subject 在大量多线程场景中的行为。我可以想象,虽然像 BlockingCollection(及其底层 ConcurrentQueue)这样的东西在你谈论的情况下已经很旧了。并且启动简单。

var queue = new BlockingCollection<long>();

// subscribing
queue.GetConsumingEnumerable()
.ToObservable(Scheduler.NewThread)
.Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId));

// sending
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool)
.Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId))
.Subscribe(i => queue.Add(i));

您当然不想接触队列和锁。 ConcurrentQueue 实现非常出色,肯定会有效处理您所说的大小队列。

关于system.reactive - Rx 队列实现和 Dispatcher 的缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11292339/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com