gpt4 book ai didi

c#-4.0 - 如何将 IObservable/IObserver 与 ConcurrentQueue 或 ConcurrentStack 一起使用

转载 作者:行者123 更新时间:2023-12-04 17:41:27 27 4
gpt4 key购买 nike

我意识到当我尝试使用多个线程处理并发队列中的项目而多个线程可以将项目放入其中时,理想的解决方案是使用具有并发数据结构的响应式(Reactive)扩展。

我原来的问题是:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

所以我很好奇是否有任何方法可以让 LINQ(或 PLINQ)查询在项目放入其中时不断出队。

我试图让它以一种可以让 n 个生产者进入队列和有限数量的线程来处理的方式工作,所以我不会使数据库过载。

如果我可以使用 Rx 框架,那么我希望我可以直接启动它,如果在 100 毫秒内放置了 100 个项目,那么作为 PLINQ 查询一部分的 20 个线程将通过队列进行处理。

我正在尝试结合三种技术:

  • Rx 框架(响应式(Reactive) LINQ)
  • PLING
  • 系统.集合.并发
    结构
  • 最佳答案

    Drew 是对的,我认为 ConcurrentQueue 尽管它听起来很适合这项工作,但实际上是 BlockingCollection 使用的底层数据结构。对我来说似乎也很前后。
    查看本书的第 7 章*
    http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1
    它将解释如何使用 BlockingCollection 以及如何让多个生产者和多个消费者各自离开“队列”。您将需要查看“GetConsumingEnumerable()”方法,并且可能只是在其上调用 .ToObservable()。

    *本书的其余部分非常平均。

    编辑:

    这是一个示例程序,我认为它可以满足您的需求?

    class Program
    {
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
    var theQueue = new BlockingCollection<string>();
    theQueue.GetConsumingEnumerable()
    .ToObservable(Scheduler.TaskPool)
    .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

    theQueue.GetConsumingEnumerable()
    .ToObservable(Scheduler.TaskPool)
    .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

    theQueue.GetConsumingEnumerable()
    .ToObservable(Scheduler.TaskPool)
    .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


    LoadQueue(theQueue, "Producer A");
    LoadQueue(theQueue, "Producer B");
    LoadQueue(theQueue, "Producer C");

    _mre.Set();

    Console.WriteLine("Processing now....");

    Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
    Thread.SpinWait(delay);
    Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
    var thread = new Thread(() =>
    {
    _mre.WaitOne();
    for (int i = 0; i < 100; i++)
    {
    target.Add(string.Format("{0} {1}", prefix, i));
    }
    });
    thread.Start();
    }
    }

    关于c#-4.0 - 如何将 IObservable/IObserver 与 ConcurrentQueue 或 ConcurrentStack 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3030649/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com