gpt4 book ai didi

c# - 定时器+生产者消费者

转载 作者:太空宇宙 更新时间:2023-11-03 14:48:14 26 4
gpt4 key购买 nike

我是线程的新手,我正在尝试学习不同的概念。

现在我正在使用计时器线程执行生产者/消费者模式。问题是我不知道如何检查所有生产者和消费者线程是否在让 Timer 线程滴答一定时间之前完成了它们的进程,并为下一个滴答释放所有创建的生产者和消费者线程。

想寻求您的帮助和指导,了解如何为这种方法创建变通办法。

这是我的示例代码:

public class WorkerThread
{
public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);

private Timer TimerThread { get; set; }

public void ThreadTimer()
{
this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
}

public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();

for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}

//TODO: Start all producer threads...

for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}

//TODO: Start all consumer threads...

//TODO: Let Thread wait until all worker threads are done
//TODO: Dispose Threads

TimerThread.Change(5000, Timeout.Infinite);

}



public void RunProducers(BlockingQueue<Item> collection)
{
List<Item> lsItems = CreateListOfItems();

foreach(var item in lsItems)
{
collection.Add(item);
}

}

public void RunConsumers(BlockingQueue<Item> collection)
{
while(true)
{
Item item = collection.Take();
Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
//Thread.Sleep(100);
}
}

public List<Item> CreateListOfItems()
{
List<Item> lsItems = new List<Item>();
for (int i = 0; i <= 9999; i++)
{
lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
}
return lsItems;
}

}

BlockCollection 实现(由于我们的环境是 .Net 3.5,我们不能在更高版本上使用库)。

public class BlockingQueue<T> 
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int MaxSize;
public bool closing;

public BlockingQueue(int maxSize) {
this.MaxSize = maxSize;
}

public void Add(T item)
{
lock(queue)
{
while(queue.Count >= this.MaxSize)
{
Monitor.Wait(queue);
}

queue.Enqueue(item);
if(queue.Count == 1)
{
Monitor.PulseAll(queue);
}

}
}

public T Take()
{
lock(queue)
{
while(queue.Count == 0)
{
Monitor.Wait(queue);
}

T item = queue.Dequeue();
if(queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return item;
}
}

public void Close()
{
lock (queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}

public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return true;
}
}
}

最佳答案

您可以只检查所有工作线程的属性 IsAlive。看起来不是很清楚的代码,但它有效:

public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();

for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}

//TODO: Start all producer threads...

for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}

//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);

while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads

TimerThread.Change(5000, Timeout.Infinite);

}

或者,也许更好的方法:

    private int[] _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}

}

public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}

//TODO: Start all producer threads...

for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}

//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);

while (Counter > 0)
Thread.Sleep(50);

//TODO: Dispose Threads

TimerThread.Change(5000, Timeout.Infinite);

}

为了避免使用 Sleep(),您可以使用 Barrier 类:

    public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;

Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}

//TODO: Start all producer threads...

for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}

//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads

TimerThread.Change(5000, Timeout.Infinite);

}

关于c# - 定时器+生产者消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53202228/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com