gpt4 book ai didi

c# - 并发队列 C# 中的线程安全

转载 作者:太空狗 更新时间:2023-10-29 21:37:52 24 4
gpt4 key购买 nike

我有一个 MessagesManager 线程,不同的线程可以向其发送消息,然后这个 MessagesManager 线程负责在 SendMessageToTcpIP() 中发布这些消息>(MessagesManager 线程的起点)。

class MessagesManager : IMessageNotifier
{
//private
private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>();

public void PublishMessage(string Message)
{
MessagesQueue.Enqueue(Message);
_waitTillMessageQueueEmptyARE.Set();
}

public void SendMessageToTcpIP()
{
//keep waiting till a new message comes
while (MessagesQueue.Count() == 0)
{
_waitTillMessageQueueEmptyARE.WaitOne();
}

//Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
Queue<string> localMessagesQueue = new Queue<string>();

while (!MessagesQueue.IsEmpty)
{
string message;
bool isRemoved = MessagesQueue.TryDequeue(out message);
if (isRemoved)
localMessagesQueue.Enqueue(message);
}

//Use the Local Queue for further processing
while (localMessagesQueue.Count() != 0)
{
TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
Thread.Sleep(2000);
}
}
}

不同的线程 (3-4) 通过调用 PublishMessage(string Message)(对 MessageManager 使用相同的对象)来发送它们的消息。消息到达后,我将该消息推送到并发队列,并通过设置 _waitTillMessageQueueEmptyARE.Set() 通知 SendMessageToTcpIP() ;SendMessageToTcpIP() 中,我正在从本地队列中的并发队列中复制消息,然后一条一条地发布。

问题:以这种方式进行入队和出队是否线程安全?会不会有什么奇怪的效果呢?

最佳答案

虽然这可能是线程安全的,但 .NET 中有内置类可以帮助实现“多发布者一个消费者”模式,例如 BlockingCollection。你可以像这样重写你的类:

class MessagesManager : IDisposable {
// note that your ConcurrentQueue is still in play, passed to constructor
private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

public MessagesManager() {
// start consumer thread here
new Thread(SendLoop) {
IsBackground = true
}.Start();
}

public void PublishMessage(string Message) {
// no need to notify here, will be done for you
MessagesQueue.Add(Message);
}

private void SendLoop() {
// this blocks until new items are available
foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
// ensure that you handle exceptions here, or whole thing will break on exception
TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
Thread.Sleep(2000); // only if you are sure this is required
}
}

public void Dispose() {
// this will "complete" GetConsumingEnumerable, so your thread will complete
MessagesQueue.CompleteAdding();
MessagesQueue.Dispose();
}
}

关于c# - 并发队列 C# 中的线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43117502/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com