gpt4 book ai didi

rebus - 如何在消息处理程序中立即停止处理新消息?

转载 作者:行者123 更新时间:2023-12-05 04:43:20 25 4
gpt4 key购买 nike

我有一个 Rebus 总线设置,只有一个工作人员,最大并行度为 1,可以“顺序”处理消息。如果处理程序失败,或出于特定业务原因,我希望总线实例立即停止处理消息。

我尝试使用 Rebus.Event 包来检测 AfterMessageHandled 处理程序中的异常并将工作人员数量设置为 0,但似乎在它真正成功停止单个工作人员实例之前处理了其他消息。

我可以在事件处理管道中的什么地方做bus.Advanced.Workers.SetNumberOfWorkers(0); 为了防止进一步的消息处理?

我还尝试在处理程序本身的 catch block 内将工作人员数量设置为 0,但它似乎不是正确的地方,因为 SetNumberOfWorkers(0) 等待处理程序在返回之前完成并且调用者是处理程序......对我来说看起来像是某种僵局。

谢谢

最佳答案

这种特殊情况有点进退两难,因为正如您正确观察到的那样,SetNumberOfWorkers 是一个阻塞函数,它将等待直到达到所需的线程数。

在您的情况下,由于您将其设置为零,这意味着您的消息处理程序需要在线程数达到零之前完成......然后:💣☠🔒

我很抱歉这么说,因为我敢打赌你想这样做是因为你不知何故处于困境——但总的来说,我必须说,想要按顺序处理消息并按消息队列的顺序处理消息是在乞求麻烦,因为有太多事情会导致消息被重新排序。

但是,我认为你可以通过安装一个传输装饰器来解决你的问题,它会在切换时绕过真正的传输。如果装饰器随后从 Receive 方法返回 null,它将触发 Rebus 的内置退避策略并开始冷却(即它会增加轮询之间的等待时间运输)。

检查一下——首先,让我们创建一个简单的、线程安全的开关:

public class MessageHandlingToggle
{
public volatile bool ProcessMessages = true;
}

(您可能想以某种方式结束并制作漂亮的内容,但现在应该这样做)

然后我们将在容器中将其注册为单例(假设此处为 Microsoft DI):

services.AddSingleton(new MessageHandlingToggle());

我们将使用 ProcessMessages 标志来指示是否应启用消息处理。

现在,当您配置 Rebus 时,您可以装饰传输并让装饰器访问容器中的切换实例:

services.AddRebus((configure, provider) =>
configure
.Transport(t => {
t.Use(...);

// install transport decorator here
t.Decorate(c => {
var transport = c.Get<ITransport>();
var toggle = provider.GetRequiredService<MessageHandlingToggle>();
return new MessageHandlingToggleTransportDecorator(transport, toggle);
})
})
.(...)
);

所以,现在您只需要构建装饰器:

public class MessageHandlingToggleTransportDecorator : ITransport
{
static readonly Task<TransportMessage> NoMessage = Task.FromResult(null);

readonly ITransport _transport;
readonly MessageHandlingToggle _toggle;

public MessageHandlingToggleTransportDecorator(ITransport transport, MessageHandlingToggle toggle)
{
_transport = transport;
_toggle = toggle;
}

public string Address => _transport.Address;

public void CreateQueue(string address) => _transport.CreateQueue(address);

public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
=> _transport.Send(destinationAddress, message, context);

public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
=> _toggle.ProcessMessages
? _transport.Receive(context, cancellationToken)
: NoMessage;
}

如您所见,当 ProcessMessages == false 时,它只会返回 null。唯一剩下的就是决定何时再次恢复处理消息,以某种方式从容器中拉出 MessageHandlingToggle(可能是通过注入(inject)),然后将 bool 弹回到

我希望对你有用,或者至少给你一些关于如何解决你的问题的灵感。 🙂

关于rebus - 如何在消息处理程序中立即停止处理新消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69621988/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com