gpt4 book ai didi

c# - 拥有资源的生产者-消费者

转载 作者:行者123 更新时间:2023-12-03 13:21:55 32 4
gpt4 key购买 nike

我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个 StreamWriter写出它的结果。每个任务还​​必须有参数传递给它。

我从 Joseph Albahari 的实现开始(见下文我的修改版本)。

我替换了Action的队列队列为 Action<T>在哪里 T是资源,并将与线程关联的资源传递给Action .但是,这给我留下了如何将参数传递给 Action 的问题。 .显然,Action必须用委托(delegate)替换,但这留下了在任务入队时如何传递参数的问题(从 ProducerConsumerQueue 类外部)。关于如何做到这一点的任何想法?

class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();

public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];

// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}

public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);

// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}

public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}

void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}

最佳答案

类型TProducerConsumerQueue<T>不必是您的资源,它可以是包含您的资源的复合类型。使用 .NET4 最简单的方法是使用 Tuple<StreamWriter, YourParameterType> .生产/消费者队列只是吃东西然后吐出T所以在你的Action<T>您可以只使用属性来获取资源和参数。如果您使用 Tuple你会使用 Item1获取资源和Item2获取参数。

如果您不使用 .NET4,则过程类似,但您只需创建自己的类:

public class WorkItem<T>
{
private StreamWriter resource;
private T parameter;

public WorkItem(StreamWriter resource, T parameter)
{
this.resource = resource;
this.parameter = parameter;
}

public StreamWriter Resource { get { return resource; } }
public T Parameter { get { return parameter; } }
}

事实上,将其通用化可能会针对您的情况进行过度设计。您可以将 T 定义为您想要的类型。

此外,作为引用,.NET4 中包含了执行多线程的新方法,这些方法可能适用于您的用例,例如并发队列和并行任务库。它们还可以与信号量等传统方法结合使用。

编辑:

继续使用这种方法,这里有一个小示例类,它演示了如何使用:
  • 控制对有限资源的访问的信号量
  • 一个并发队列,用于在线程之间安全地管理该资源
  • 使用任务并行库的任务管理

  • 这里是 Processor类(class):
    public class Processor
    {
    private const int count = 3;
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
    private Semaphore semaphore = new Semaphore(count, count);

    public Processor()
    {
    // Populate the resource queue.
    for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
    }

    public void Process(int parameter)
    {
    // Wait for one of our resources to become free.
    semaphore.WaitOne();
    StreamWriter resource;
    queue.TryDequeue(out resource);

    // Dispatch the work to a task.
    Task.Factory.StartNew(() => Process(resource, parameter));
    }

    private Random random = new Random();

    private void Process(StreamWriter resource, int parameter)
    {
    // Do work in background with resource.
    Thread.Sleep(random.Next(10) * 100);
    resource.WriteLine("Parameter = {0}", parameter);
    queue.Enqueue(resource);
    semaphore.Release();
    }
    }

    现在我们可以像这样使用这个类:
    var processor = new Processor();
    for (int i = 0; i < 10; i++)
    processor.Process(i);

    并且同时调度的任务不超过三个,每个任务都有自己的 StreamWriter被回收的资源。

    关于c# - 拥有资源的生产者-消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6003671/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com