gpt4 book ai didi

c# - 生产者-消费者同步问题

转载 作者:行者123 更新时间:2023-12-03 13:07:35 41 4
gpt4 key购买 nike

我正在尝试编写一个简单的生产者-消费者应用程序,在该应用程序中,我需要从文件中读取大块数据(可能很大),并且(出于简单测试目的)只需通过另一个线程将其写入另一个文件中即可。

我尝试了很多在线资源,但是这些线程同步任务使我难以理解,发现的每个示例对我来说都缺少一些重要方面。

我已经整理了ALMOST似乎可以工作的代码段,但是与线程相关的某些事情显然是错误的,因此我想问问您是否有人可以发现我在做什么错。
如果我在下面的程序中运行某些测试文件,则该程序会完成OK (至少对我和我的测试文件而言),但如果我在Thread.Sleep(20)方法中取消对dequeueObjectAndWriteItToFile的注释,则(以测试生产者更快时发生的情况)然后,使用者(根据控制台中打印的数据)将生产者在队列中插入maxQueueSize + 1数据块,并将插入程序,使其陷入无限循环或的状态。

我怀疑 _producerThreadWaitEventHandler.Set() 调用可能是问题的一部分,因为目前每个while循环都会在dequeueObjectAndWriteItToFile中调用它(我只想在必要时调用它,即,如果 _producerThreadWaitEventHandler.waitOne() 已被调用我应该唤醒该线程,但是我不知道如何找出是否已为特定线程调用waitOne来唤醒线程)。
当然,可能还存在其他同步问题,但是由于我是多线程技术的新手,所以我不知道首先看哪里以及什么是最佳解决方案。

请注意,我想使用(并了解)基本技术(例如 Monitor AutoResetEvent )来进行同步(而不是BlockingQueue,TPL等),因此我希望对以下代码进行一些细微调整就能使其正常工作。

如有任何提示,我将不胜感激。

谢谢。

using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;

class ProducerConsumerApp : IDisposable
{
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;

int maxQueueSize = 4; // max allowed number of objects in the queue

EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);

Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();

public ProducerConsumerApp(Stream outputStream)
{
_consumerThread = new Thread(dequeueObjectAndWriteItToFile);
_consumerThread.Start(outputStream);
}

public void enqueueObject(byte[] data)
{
lock (_lock)
{
// TODO !!!
// Make sure producent doesn't enqueue more objects than the maxQueueSize is,
// i.e. let the producent wait until consumer dequeues some object from the full queue
if (_queue.Count > maxQueueSize) // would "while" be better? Doesn't seem to change anything
{
_producerThreadWaitEventHandler.WaitOne();
}
// Thread.Sleep(20); // just for testing

_queue.Enqueue(data);

// data being read in case of a text file:
//string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("Enqueuing data: "+str);

}
_consumerThreadWaitEventHandler.Set(); // data enqueued => wake the consumerThread
}

public void Dispose() // called automatically (IDisposable implementer) when instance is being destroyed
{
enqueueObject(null); // Signal the consumer to exit.
_consumerThread.Join(); // Wait for the consumer's thread to finish.
_consumerThreadWaitEventHandler.Close(); // Release any OS resources.
}

void dequeueObjectAndWriteItToFile(object outputStream)
{
while (true)
{
// Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
// PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?

byte[] data = null;
lock (_lock)
if (_queue.Count > 0) // queue not empty
{
data = _queue.Dequeue();

_producerThreadWaitEventHandler.Set();
// !!! This doesn't seem right - I don't want to call this in each while iteration
// I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called
// but how to check such a condition?

if (data == null)
{
// Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
return;
}
}
if (data != null)
{
((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file

// just a test in case of a text file:
// string str = System.Text.Encoding.Default.GetString(data);
// Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);

} else { // empty queue => let the consumerThread wait
_consumerThreadWaitEventHandler.WaitOne(); // No more tasks - wait for a signal
}
}
}

static void Main()
{

FileInfo originalFile = new FileInfo(originalFilePath);
byte[] data = new byte[blockSize];
int bytesRead;

using (FileStream originalFileStream = originalFile.OpenRead()) // file input stream
using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
{
while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0) // reads blocks of data from a file
{
// test - in case of a text file:
//string str = System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("data block read from a file:" + str);

if (bytesRead < data.Length)
{
byte[] data2 = new byte[bytesRead];
Array.Copy(data, data2, bytesRead);
data = data2;
}

q.enqueueObject(data); // put the data into the queue

data = new byte[blockSize];
}
}
// because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread

Console.WriteLine("Finish");
}
}

最佳答案

您的问题是您在锁中等待。这意味着另一个线程也将在lock语句上阻塞,并且永远不会调用_producerThreadWaitEventHandler.Set(); Classical死锁。

最好使用Semaphore 来限制农产品可以放入队列中的项目数。
将信号量初始化为全部免费:producerSemaphore = new Semaphore (15, 15);。在生产者中,等待信号量,在消费者中,调用Release()

以相同的方式,您可以使用Semaphore或CountdownEvent来避免依赖queue.Count

更好的是,您可以将ConcurrentQueue与信号量结合使用,以确保生产者不会使队列过满。
如果您成功地从队列中将项目出队,请调用producerSemaphore.Release();

关于c# - 生产者-消费者同步问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60424503/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com