gpt4 book ai didi

c# - Rabbitmq - 连接/ channel /消费者的恢复

转载 作者:太空狗 更新时间:2023-10-29 18:13:09 25 4
gpt4 key购买 nike

我正在创建一个在无限循环中运行的消费者,以从队列中读取消息。我正在寻找有关如何在我的无限循环中恢复 abd continue 的建议/示例代码,即使存在网络中断也是如此。消费者必须保持运行,因为它将作为 WindowsService 安装。

1) 有人可以解释一下如何正确使用这些设置吗?它们有什么区别?

NetworkRecoveryInterval 
AutomaticRecoveryEnabled
RequestedHeartbeat

2) 请查看我当前的消费者示例代码。我正在使用 .Net RabbitMQ 客户端 v3.5.6。

以上设置将如何为我进行“恢复”?例如consumer.Queue.Dequeue 会阻塞直到恢复吗?这似乎不对所以...

我必须为此手动编码吗?例如consumer.Queue.Dequeue 是否会抛出一个异常,我必须为此检测并手动重新创建我的连接、 channel 和消费者?或者只是消费者,因为“AutomaticRecovery”会为我恢复 channel ?

这是否意味着我应该将消费者创建移动到 while 循环中? channel 创建怎么样?和连接创建?

3) 假设我必须手动执行一些恢复代码,是否有事件回调(以及我如何注册它们)告诉我存在网络问题?

谢谢!

public void StartConsumer(string queue)
{
using (IModel channel = this.Connection.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
const bool noAck = false;
channel.BasicConsume(queue, noAck, consumer);

// do I need these conditions? or should I just do while(true)???
while (channel.IsOpen &&
Connection.IsOpen &&
consumer.IsRunning)
{
try
{
BasicDeliverEventArgs item;
if (consumer.Queue.Dequeue(Timeout, out item))
{
string message = System.Text.Encoding.UTF8.GetString(item.Body);
DoSomethingMethod(message);
channel.BasicAck(item.DeliveryTag, false);
}
}
catch (EndOfStreamException ex)
{
// this is likely due to some connection issue -- what am I to do?
}
catch (Exception ex)
{
// should never happen, but lets say my DoSomethingMethod(message); throws an exception
// presumably, I'll just log the error and keep on going
}
}
}
}

public IConnection Connection
{
get
{
if (_connection == null) // _connection defined in class -- private static IConnection _connection;
{
_connection = CreateConnection();
}
return _connection;
}
}

private IConnection CreateConnection()
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "RabbitMqHostName",
UserName = "RabbitMqUserName",
Password = "RabbitMqPassword",
};

// why do we need to set this explicitly? shouldn't this be the default?
factory.AutomaticRecoveryEnabled = true;

// what is a good value to use?
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5);

// what is a good value to use? How is this different from NetworkRecoveryInterval?
factory.RequestedHeartbeat = 5;

IConnection connection = factory.CreateConnection();
return connection;
}

最佳答案

RabbitMQ 特性

documentation on RabbitMQ's site其实真的很好。如果您想恢复队列、交换器和消费者,您正在寻找默认启用的拓扑恢复。自动恢复(即 enabled by default )包括:

  • 重新连接
  • 恢复连接监听器
  • 重新开通 channel
  • 恢复 channel 听众
  • 恢复 channel basic.qos设置,发布者确认和交易设置

NetworkRecoveryInterval 是执行自动恢复重试之前的时间量(默认为 5 秒)。

Heartbeat 还有一个目的,即识别无效的 TCP 连接。有more to read about that在 RabbitMQ 的网站上。

代码示例

为恢复编写可靠的代码很棘手。 EndOfStreamException(正如您所怀疑的那样)很可能是由于网络问题造成的。如果您使用 management plugin ,您可以通过从那里关闭连接来重现这一点,并看到触发了异常。对于类似生产的应用程序,您可能希望有一组代理,您可以在连接失败的情况下在它们之间进行切换。如果您有多个 RabbitMQ 代理,您可能还想保护自己免受一个或多个服务器上的长期服务器故障的影响。您可能想要实现错误策略,例如重新排队消息或使用死信交换。

我一直在思考这些事情并编写了一个瘦客户端,RawRabbit ,处理其中的一些事情。也许它适合你?如果不是,我建议您将 QueueingBasicConsumer 更改为 EventingBasicConsumer。它是事件驱动的,而不是线程阻塞的。

var eventConsumer = new EventingBasicConsumer(channel);
eventConsumer.Received += (sender, args) =>
{
var body = args.Body;
eventConsumer.Model.BasicAck(args.DeliveryTag, false);
};
channel.BasicConsume(queue, false, eventConsumer);

如果您激活了拓扑恢复,RabbitMQ 客户端将恢复消费者并再次开始接收消息。要进行更精细的控制,请连接 ConsumerCancelledShutdown 的事件处理程序以检测连接问题,并连接 Registered 以了解何时可以再次使用消费者。

关于c# - Rabbitmq - 连接/ channel /消费者的恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33699165/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com