gpt4 book ai didi

c# - RabbitMQ 消费者恢复

转载 作者:太空宇宙 更新时间:2023-11-03 10:37:38 26 4
gpt4 key购买 nike

我最近开始研究 RabbitMQ。我使用作为我的消费者的 RabbitMQ .net 库创建了一个 Windows 服务。该消费者将用于处理批量流程,例如发送大量电子邮件等。

我通过实现作为 RabbitMQ .net 库的一部分的 SimpleRpcServer 类并覆盖 HandleCall/HandleCast 方法来构建它。就消费和处理消息而言,一切都很好。我们已经开始研究可用于将此 Windows 服务部署到 Amazon Web Services 的服务器的部署选项。将更新部署到 Windows 服务时,必须停止、更新该服务,然后再次启动该服务。

我的问题是:我该怎么做才能在 Windows 服务上触发 Stop 事件时,该服务要么等待所有当前传递给消费者的消息完成处理并重新排队任何已传送但尚未开始处理的消息。

下面是一些示例代码:

public partial class ExampleService: ServiceBase
{
private List<Task> _watcherTasks = new List<Task>();
protected override void OnStart(string[] args)
{
var factory = new ConnectionFactory() {
HostName = _hostname,
VirtualHost = _virtualHost,
UserName = _username,
Password = _password,
Ssl = new SslOption
{
Enabled = true,
ServerName = _hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
SslPolicyErrors.RemoteCertificateChainErrors
},
RequestedHeartbeat = 30
};
conn = factory.CreateConnection();

var emailQueue = requestChannel.QueueDeclare("email", false, false, false, null);
var emailSub = new Subscription(requestChannel, emailQueue);
var emailServer = new ServiceBusConsumer(emailSub);
Task emailWatcher = Task.Run(() => emailServer.MainLoop());
_watcherTasks.Add(emailWatcher);
}

protected override void OnStop()
{
conn.Close();
Task.WaitAll(_watcherTasks.ToArray(), 60000);
}
}

public class ServiceBusConsumer : SimpleRpcServer
{
public ServiceBusConsumer(Subscription subscription) : base(subscription)
{

}
public override void HandleSimpleCast(bool isRedelivered, RabbitMQ.Client.IBasicProperties requestProperties, byte[] body)
{
try
{
//Uses some reflection and invokes function to process the message here.
}
catch (Exception ex)
{
//Creates event log entry of exception
}
}
}

最佳答案

您不能在消息被消费后显式地将消息推送回队列。终止使用消息的进程应该会导致消息在超时后重新排队,前提是没有发送确认,并且您的队列配置为接收确认。不过请考虑,这可能会中断队列中消息的排序。

例如,假设您发送消息 M1 和 M2,其中 M2 依赖于 M1。如果您在处理 M1 时终止您的消费者,并且当 M2 仍在队列中时,M1 将在 M2 之后重新排队。当您的消费者重新启动时,这些消息将被乱序处理。

尝试实现这一点会引入太多的复杂性。我会简单地让您的 Windows 服务完成处理其出队消息,然后简单地停止监听队列,并优雅地终止。您的代码应如下所示:

while (!stopConsuming) {
try {
BasicDeliverEventArgs basicDeliverEventArgs;
var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);

if (!messageIsAvailable) continue;
var payload = basicDeliverEventArgs.Body;

var message = Encoding.UTF8.GetString(payload);
OnMessageReceived(new MessageReceivedEventArgs {
Message = message,
EventArgs = basicDeliverEventArgs
});

if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
}

catch (Exception exception) {
OnMessageReceived(new MessageReceivedEventArgs {
Exception = new AMQPConsumerProcessingException(exception)
});
if (!catchAllExceptions) Stop();
}
}
}

在此示例中,stopConsuming 变量(如果从另一个线程访问则标记为 volatile)将确定 Windows 服务是否继续从队列中读取消息,在它处理完当前消息之后。如果设置为 true,循环将结束,不再有消息出队。

我将要部署一个 C# 库,它可以完全满足您的要求,除此之外。请查看我的博客,insidethecpu.com有关详细信息和教程。您可能还对 this 感兴趣.

关于c# - RabbitMQ 消费者恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27028665/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com