gpt4 book ai didi

c# - 异步取自阻塞集合

转载 作者:可可西里 更新时间:2023-11-01 03:11:29 24 4
gpt4 key购买 nike

我正在使用 BlockingCollection实现生产者/消费者模式。我有一个异步循环,它用要处理的数据填充集合,然后客户端可以在稍后的时间访问这些数据。数据包很少到达,我希望在不使用阻塞调用的情况下完成轮询。

本质上,我正在寻找类似 BeginTake 的东西和 EndTake阻塞集合中不存在,因此我可以在回调中使用内部线程池。它不一定是 BlockingCollection以任何方式。任何满足我需要的东西都会很棒。

这就是我现在所拥有的。 _bufferedPacketsBlockingCollection<byte[]> :

public byte[] Read(int timeout)
{
byte[] result;
if (_bufferedPackets.IsCompleted)
{
throw new Exception("Out of packets");
}
_bufferedPackets.TryTake(out result, timeout);
return result;
}

我希望它是这样的,用伪代码:

public void Read(int timeout)
{
_bufferedPackets.BeginTake(result =>
{
var bytes = _bufferedPackets.EndTake(result);
// Process the bytes, or the resuting timeout
}, timeout, _bufferedPackets);
}

我对此有哪些选择?我不想将任何线程置于等待状态,因为它还有很多其他 IO 内容需要处理,而且我很快就会用完线程。

更新:我重写了相关代码以不同方式使用异步过程,本质上是根据超时限制内是否存在等待请求来交换回调。这工作正常,但如果有一种方法可以做到这一点而不诉诸计时器和交换 lambda,那仍然会很棒,因为这可能会导致竞争条件并且难以编写(和理解)。我也用自己的异步队列实现解决了这个问题,但如果有更标准且经过良好测试的选项,它仍然很棒。

最佳答案

我可能误解了你的情况,但你不能使用非阻塞集合吗?

我创建了这个例子来说明:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTakeFromBlockingCollection
{
class Program
{
static void Main(string[] args)
{
var queue = new ConcurrentQueue<string>();

var producer1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i += 1)
{
queue.Enqueue("=======");
Thread.Sleep(10);
}
});

var producer2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i += 1)
{
queue.Enqueue("*******");
Thread.Sleep(3);
}
});

CreateConsumerTask("One ", 3, queue);
CreateConsumerTask("Two ", 4, queue);
CreateConsumerTask("Three", 7, queue);

producer1.Wait();
producer2.Wait();
Console.WriteLine(" Producers Finished");
Console.ReadLine();
}

static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
{
Task.Factory.StartNew(() =>
{
while (true)
{
string result;
if (queue.TryDequeue(out result))
{
Console.WriteLine(" {0} consumed {1}", taskName, result);
}
Thread.Sleep(sleepTime);
}
});
}
}
}

这是程序的输出

enter image description here

我相信 BlockingCollection 旨在包装并发集合并提供一种允许多个消费者阻塞的机制;等待生产者。这种用法似乎与您的要求相反。

我找到了这个 article about the BlockingCollection class有帮助。

关于c# - 异步取自阻塞集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11949424/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com