gpt4 book ai didi

c# - 简单的内存消息队列

转载 作者:太空狗 更新时间:2023-10-30 01:32:42 25 4
gpt4 key购买 nike

我们现有的域事件实现限制(通过阻塞)一次发布到一个线程,以避免对处理程序的重入调用:

public interface IDomainEvent {}  // Marker interface

public class Dispatcher : IDisposable
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

// Subscribe code...

public void Publish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...

foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}
}
// Dispose pattern...
}

如果一个处理程序发布一个事件,这将死锁。

我如何重写它以序列化对 Publish 的调用?换句话说,如果订阅处理程序 A 发布事件 B,我将得到:

  1. 处理程序 A 调用
  2. 处理程序 B 调用

同时在多线程环境中保留对处理程序的不可重入调用的条件。

我不想更改公共(public)方法签名;例如,应用程序中没有地方可以调用方法来发布队列。

最佳答案

我们想出了一种同步进行的方法。

public class Dispatcher : IDisposable
{
private readonly ConcurrentQueue<IDomainEvent> queue = new ConcurrentQueue<IDomainEvent>();
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

// Subscribe code...

public void Publish(IDomainEvent domainEvent)
{
queue.Enqueue(domainEvent);

if (IsPublishing)
{
return;
}

PublishQueue();
}

private void PublishQueue()
{
IDomainEvent domainEvent;
while (queue.TryDequeue(out domainEvent))
{
InternalPublish(domainEvent);
}
}

private void InternalPublish(IDomainEvent domainEvent)
{
semaphore.Wait();
try
{
// Get event subscriber(s) from concurrent dictionary...

foreach (Action<IDomainEvent> subscriber in eventSubscribers)
{
subscriber(domainEvent);
}
}
finally
{
semaphore.Release();
}

// Necessary, as calls to Publish during publishing could have queued events and returned.
PublishQueue();
}

private bool IsPublishing
{
get { return semaphore.CurrentCount < 1; }
}
// Dispose pattern for semaphore...
}

关于c# - 简单的内存消息队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36436034/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com