gpt4 book ai didi

c# - 如何实现可暂停的 BlockingCollection

转载 作者:行者123 更新时间:2023-11-30 14:40:02 25 4
gpt4 key购买 nike

我正在编写一个 WCF 服务,它从多个模块(数据库、其他服务..)接收通知并将它们添加到阻塞集合中,以便在将相关数据发布到客户端的使用者线程上进行处理。

客户端可以请求存储在服务器上的完整数据,在此操作期间我不想接受任何新通知。基本上我想暂停阻塞收集(或消费者线程)并在完成客户端请求后恢复通知接收和处理。

实现这种行为的好方法是什么?

最佳答案

如果我没理解错的话,你想阻止消费者线程在一些其他查询操作发生时从 BlockingCollection 中消费数据,但在此期间生产者可以继续推送数据进入收藏。

我是对的,那么我认为最好的办法是有一个 ManualResetEvent,它通常会发出信号并且消费者线程不会被阻塞,当你想暂停消费者时,你可以重置将导致每个消费者阻塞等待事件发出信号的事件。

更新:这是一个快速控制台应用程序,演示了我上面描述的内容。这只是一个快速演示,但显示了 1 个生产者线程和两个消费者线程。通过按键盘上的空格键,可以在RunningPaused 之间切换消费者的状态。

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;

namespace ProducerConsumerDemo
{
class Program
{
static BlockingCollection<int> _queue = new BlockingCollection<int>();
static ManualResetEvent _pauseConsumers = new ManualResetEvent(true);
static bool _paused = false;
static int _itemsEnqueued = 0;
static int _itemsDequeued = 0;

static void Main(string[] args)
{
Thread producerThread = new Thread(Producer);
Thread consumerThread1 = new Thread(Consumer);
Thread consumerThread2 = new Thread(Consumer);
producerThread.Start();
consumerThread1.Start();
consumerThread2.Start();

while (true)
{
WriteAt(0,0,"State: " + (string)(_paused ? "Paused" : "Running"));
WriteAt(0,1,"Items In Queue: " + _queue.Count);
WriteAt(0, 2, "Total enqueued: " + _itemsEnqueued);
WriteAt(0, 3, "Total dequeued: " + _itemsDequeued);

Thread.Sleep(100);
if (Console.KeyAvailable)
{
if (Console.ReadKey().Key == ConsoleKey.Spacebar)
{
if (_paused)
{
_paused = false;
_pauseConsumers.Set();
}
else
{
_paused = true;
_pauseConsumers.Reset();
}
}
}
}
}

static void WriteAt(int x, int y, string format, params object[] args)
{
Console.SetCursorPosition(x, y);
Console.Write(" ");
Console.SetCursorPosition(x, y);
Console.Write(format, args);
}

static void Consumer()
{
while (true)
{
if (_paused)
{
// If we are paused, wait for the signal to indicate that
// we can continue
_pauseConsumers.WaitOne();
}

int value;
if (_queue.TryTake(out value))
{
Interlocked.Increment(ref _itemsDequeued);
// Do something with the data
}
Thread.Sleep(500);
}
}

static void Producer()
{
Random rnd = new Random();
while (true)
{
if (_queue.TryAdd(rnd.Next(100)))
{
Interlocked.Increment(ref _itemsEnqueued);
}
Thread.Sleep(500);
}
}
}
}

关于c# - 如何实现可暂停的 BlockingCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5821090/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com