gpt4 book ai didi

c# - 可以将其当前计数减少 N(N>=1)的信号量?

转载 作者:太空宇宙 更新时间:2023-11-03 14:42:14 30 4
gpt4 key购买 nike

我正在实现一个流量控制组件来限制可以发送的最大请求数。每个工作线程都可以发送单个请求或一批请求,但任何时候待处理请求的总数不应超过最大数量。

我最初想用 SemaphoreSlim 来实现:将信号量初始化为最大请求数,然后当工作线程要调用服务时,它必须获得足够数量的 token ,但是我发现实际上 SemaphoreSlim 和 Semaphore 只允许线程将信号量计数减少 1,在我的例子中我希望通过工作线程携带的请求数来减少计数。

我应该在这里使用什么同步原语?

澄清一下,该服务支持批处理,因此一个线程可以在一次服务调用中发送 N 个请求,但相应地它应该能够将信号量的当前计数减少 N。

最佳答案

下面是自定义SemaphoreManyFifo提供方法的类 Wait(int acquireCount)方法和Release(int releaseCount) .它的行为是严格的 FIFO。它具有相当不错的性能(在我的 PC 的 8 个线程上每秒约 500,000 次操作)。

public class SemaphoreManyFifo : IDisposable
{
private readonly object _locker = new object();
private readonly Queue<(ManualResetEventSlim, int AcquireCount)> _queue;
private readonly ThreadLocal<ManualResetEventSlim> _pool;
private readonly int _maxCount;
private int _currentCount;

public int CurrentCount => Volatile.Read(ref _currentCount);

public SemaphoreManyFifo(int initialCount, int maxCount)
{
// Proper arguments validation omitted
Debug.Assert(initialCount >= 0);
Debug.Assert(maxCount > 0 && maxCount >= initialCount);
_queue = new Queue<(ManualResetEventSlim, int)>();
_pool = new ThreadLocal<ManualResetEventSlim>(
() => new ManualResetEventSlim(false), trackAllValues: true);
_currentCount = initialCount;
_maxCount = maxCount;
}
public SemaphoreManyFifo(int initialCount) : this(initialCount, Int32.MaxValue) { }

public void Wait(int acquireCount)
{
Debug.Assert(acquireCount > 0 && acquireCount <= _maxCount);
ManualResetEventSlim gate;
lock (_locker)
{
Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
if (acquireCount <= _currentCount && _queue.Count == 0)
{
_currentCount -= acquireCount; return; // Fast path
}
gate = _pool.Value;
gate.Reset(); // Important, because the gate is reused
_queue.Enqueue((gate, acquireCount));
}
gate.Wait();
}

public void Release(int releaseCount)
{
Debug.Assert(releaseCount > 0);
lock (_locker)
{
Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
if (releaseCount > _maxCount - _currentCount)
throw new SemaphoreFullException();
_currentCount += releaseCount;
while (_queue.Count > 0 && _queue.Peek().AcquireCount <= _currentCount)
{
var (gate, acquireCount) = _queue.Dequeue();
_currentCount -= acquireCount;
gate.Set();
}
}
}

public void Dispose()
{
foreach (var gate in _pool.Values) gate.Dispose();
_pool.Dispose();
}
}

在上述实现中添加对超时和取消的支持并非易事。它需要一个不同的(可更新的)数据结构而不是 Queue<T> .


原文 Wait+Pulse 实现可以在 1st revision 中找到这个答案。它很简单,但缺少理想的 FIFO 行为。

关于c# - 可以将其当前计数减少 N(N>=1)的信号量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56283112/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com