gpt4 book ai didi

c# - 具有固定大小 FIFO 队列的生产者/消费者模式

转载 作者:太空狗 更新时间:2023-10-29 21:05:30 25 4
gpt4 key购买 nike

我需要围绕固定大小的 FIFO 队列实现生产者/消费者模式。我认为围绕 ConcurrentQueue 的包装器类可能适用于此,但我不完全确定(我以前从未使用过 ConcurrentQueue)。其中的转折点是队列只需要保存固定数量的项目(在我的例子中是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的消费者任务运行时,它需要及时取出队列中存在的所有项目并处理它们。

就其值(value)而言,我的消费者对排队项目的处理只不过是通过 SOAP 将它们上传到并非 100% 可靠的 Web 应用程序。如果无法建立连接或调用 SOAP 调用失败,我应该丢弃这些项目并返回队列以获取更多。由于 SOAP 的开销,我试图在一次 SOAP 调用中最大限度地增加队列中可以发送的项目数。

有时,我的生产者添加项目的速度可能比我的消费者能够移除和处理它们的速度快。如果队列已经满了,我的生产者需要添加另一个项目,我需要将新项目入队,然后将最旧的项目出队,以便队列的大小保持固定。基本上,我需要始终在队列中保留最新生成的项目(即使这意味着某些项目不会被消耗,因为我的消费者当前正在处理以前的项目)。

关于生产者在队列中的项目固定时保持数量,我从这个问题中发现了一个潜在的想法:

Fixed size queue which automatically dequeues old values upon new enques

我目前在 ConcurrentQueue 周围使用一个包装类(基于那个答案)和一个 Enqueue() 方法,如下所示:

public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

public int Size { get; private set; }

public FixedSizeQueue(int size)
{
Size = size;
}

public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);

lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}

我创建了一个此类的实例,它对队列的大小有限制,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动了生产者任务,它开始填充队列。当添加项目导致队列计数超过最大大小时,我的 Enqueue() 方法中的代码似乎在从队列中删除最旧的项目方面正常工作。现在我需要我的消费者任务来使项目出列并处理它们,但这是我的大脑感到困惑的地方。为我的消费者实现 Dequeue 方法的最佳方法是什么,该方法将在某个时刻拍摄队列快照并将所有项目出列以进行处理(生产者在此过程中可能仍在向队列中添加项目)?

最佳答案

简单地说,ConcurrentQueue 有一个“ToArray”方法,当进入该方法时,将锁定集合并生成队列中所有当前项目的“快照”。如果您希望为您的消费者提供一组工作,您可以锁定入队方法具有的同一对象,调用 ToArray(),然后旋转 while(!queue.IsEmpty) queue.TryDequeue(out trash)。循环清除队列,然后返回您提取的数组。

这将是您的 GetAll()方法:

public T[] GetAll()
{
lock (syncObj) // so that we don't clear items we didn't get with ToArray()
{
var result = queue.ToArray();
T trash;
while(!queue.IsEmpty) queue.TryDequeue(out trash);
}
}

因为你必须清除队列,你可以简单地结合这两个操作;创建一个适当大小的数组(使用 queue.Count),然后当队列不为空时,将一个项目出队并将其放入数组中,然后返回。

现在,这就是特定问题的答案。我现在必须凭良心戴上我的 CodeReview.SE 帽子并指出几件事:

  • 从不使用lock(this) .你永远不知道还有什么其他对象可能正在使用你的对象作为锁定焦点,因此当对象从内部锁定自身时会被阻止。最好的做法是锁定一个私有(private)范围的对象实例,通常是为锁定而创建的:private readonly object syncObj = new object();

  • 既然你要锁定包装器的关键部分,我会使用普通的 List<T>而不是并发集合。访问速度更快,更容易清除,因此您将能够比 ConcurrentQueue 允许的更简单地完成您正在做的事情。要入队,请在索引零之前锁定同步对象 Insert(),然后使用 RemoveRange() 从索引 Size 到列表的当前 Count 中删除任何项目。要出列,锁定同一个同步对象,调用 myList.ToArray()(来自 Linq 命名空间;与 ConcurrentQueue 的作用几乎相同),然后在返回数组之前调用 myList.Clear()。再简单不过了:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();

    public int Size { get; private set; }

    public FixedSizeQueue(int size) { Size = size; }

    public void Enqueue(T obj)
    {
    lock (syncObj)
    {
    queue.Insert(0,obj)
    if(queue.Count > Size)
    queue.RemoveRange(Size, Count-Size);
    }
    }

    public T[] Dequeue()
    {
    lock (syncObj)
    {
    var result = queue.ToArray();
    queue.Clear();
    return result;
    }
    }
    }
  • 您似乎明白您正在使用此模型丢弃排队的项目。这通常不是一件好事,但我愿意相信你。但是,我会说有一种无损的方法可以实现这一点,即使用 BlockingCollection。 BlockingCollection 包装任何 IProducerConsumerCollection,包括大多数 System.Collections.Concurrent 类,并允许您指定队列的最大容量。然后该集合将阻止任何试图从空队列中出队的线程,或任何试图添加到已满队列的线程,直到添加或删除项目以便有东西可以获取或有空间可以插入。这是实现具有最大大小的生产者-消费者队列的最佳方式,或者是实现需要“轮询”以查看是否有消费者需要处理的队列的最佳方式。如果你走这条路,只有消费者必须扔掉的才会被扔掉;消费者将看到生产者放入的所有行,并对每一行做出自己的决定。

关于c# - 具有固定大小 FIFO 队列的生产者/消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12410777/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com