- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我意识到当我尝试使用多个线程处理并发队列中的项目而多个线程可以将项目放入其中时,理想的解决方案是使用具有并发数据结构的响应式(Reactive)扩展。
我原来的问题是:
While using ConcurrentQueue, trying to dequeue while looping through in parallel
所以我很好奇是否有任何方法可以让 LINQ(或 PLINQ)查询在项目放入其中时不断出队。
我试图让它以一种可以让 n 个生产者进入队列和有限数量的线程来处理的方式工作,所以我不会使数据库过载。
如果我可以使用 Rx 框架,那么我希望我可以直接启动它,如果在 100 毫秒内放置了 100 个项目,那么作为 PLINQ 查询一部分的 20 个线程将通过队列进行处理。
我正在尝试结合三种技术:
最佳答案
Drew 是对的,我认为 ConcurrentQueue 尽管它听起来很适合这项工作,但实际上是 BlockingCollection 使用的底层数据结构。对我来说似乎也很前后。
查看本书的第 7 章*
http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1
它将解释如何使用 BlockingCollection 以及如何让多个生产者和多个消费者各自离开“队列”。您将需要查看“GetConsumingEnumerable()”方法,并且可能只是在其上调用 .ToObservable()。
*本书的其余部分非常平均。
编辑:
这是一个示例程序,我认为它可以满足您的需求?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}
关于c#-4.0 - 如何将 IObservable/IObserver 与 ConcurrentQueue 或 ConcurrentStack 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3030649/
我是一名优秀的程序员,十分优秀!