- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在我的软件中尝试一些边缘情况。所以我创建了一个非常简单的测试环境:
我每 5 秒发送一条简单的短信。发送方和接收方运行在同一个 UNIX 机器上,而 RabbitMQ 服务器运行在网络中不同的机器上。到目前为止,一切都很好。现在,我使用 systemctl stop rabbitmq-server
停止我的 RabbitMQ 服务器。
我收到有关发送方和接收方的错误,这是预期的。
我使用 systemctl start rabbitmq-server
重启了 RabbitMQ 服务器。
现在好戏开始了!发件人可以恢复并继续发送消息但消费者无法恢复并且不会收到消息。它们在 RabbitMQ 服务器上累积!
这是我的来自发件人的日志条目(按预期工作):
2019-01-22 21:18:25.628 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.632 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to any Broker. Retrying in 00:00:05
2019-01-22 21:18:35.444 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 7.' | Num: 7 | Guid: 5345c7e4-61e6-4c79-8179-d4bef7864420'.
2019-01-22 21:18:40.452 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 8.' | Num: 8 | Guid: 3cd8635c-cdfa-45f3-8495-2acb0713d47b'.
2019-01-22 21:18:45.457 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 9.' | Num: 9 | Guid: 099462b8-cd66-40b9-ac10-89c3246819ec'.
2019-01-22 21:18:50.470 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 10.' | Num: 10 | Guid: c25139b2-8e45-4771-9544-830014382e0c'.
2019-01-22 21:18:55.515 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 11.' | Num: 11 | Guid: 90049d91-3805-4aaa-ac18-b61c09164afd'.
2019-01-22 21:19:00.526 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 12.' | Num: 12 | Guid: 108ff318-6a34-4e64-94bd-dafa67aa6717'.
这显示了最后一条错误消息,然后可以看到 EasyNetQ 已恢复并可以再次传递消息。
消息消费者不工作!这是我的日志条目:
2019-01-22 21:18:25.623 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.625 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to any Broker. Retrying in 00:00:05
它就在这里,永远等待!当消息在 RabbitMQ 服务器上累积时,看起来事情在某种程度上死锁:
当我停止我的消费者应用程序并重新启动它时,消息会被拾取。
我使用以下代码连接两个应用程序(发送方和消费者):
private static IBus SetupRabbitMqConnection(string rabbitServer, string rabbitVHost, ushort rabbitPort, string rabbitUser, string rabbitPwd)
{
Log.Logger.Debug($"Creating a connection to RabbitMQ server '{rabbitServer}' on port {rabbitPort.ToString()} " +
$"using the EasyNetQ library....");
try
{
var connStr = $"host={rabbitServer}:{rabbitPort.ToString()};virtualHost={rabbitVHost};username={rabbitUser};" +
$"password={rabbitPwd};publisherConfirms=true;timeout=30;prefetchcount=1;requestedHeartbeat=30";
var msgBus = RabbitHutch.CreateBus(connStr, x => { });
if (!msgBus.IsConnected)
{
var errMsg = $"Currently not connected to RabbitMQ server '{rabbitServer}'.";
Log.Logger.Error(errMsg);
}
Log.Logger.Debug("Successfully connected to RabbitMQ server.");
return msgBus;
}
catch (Exception ex)
{
Log.Logger.Error($"Error to establish a connection to RabbitMQ server '{rabbitServer}'. Error: {ex}");
throw;
}
}
消费者程序注册监听器如下:
var msgBus = SetupRabbitMqConnection(rabbitServer, vhost, rabbitPort, rabbitUser, rabbitPwd);
RegisterMsgSubscriptions(msgBus);
private static void RegisterMsgSubscriptions(IBus msgBus)
{
Log.Logger.Debug("Starting to register RabbitMQ message subscriptions...");
try
{
#region Queue declarations
var advancedBus = msgBus.Advanced;
var testQueueOne = new EasyNetQ.Topology.Queue(TestQueueOneName, true);
Log.Logger.Debug("Finished declaring queues.");
#endregion
#region Message Queue Handler registrations
advancedBus.Consume(testQueueOne, registration => registration
.Add<RabbitMessage<TestTextMessageDto>>(MessageProcessor.ProcessRabbitTestMessage));
Log.Logger.Debug("Finished registration of message handlers for several queues.");
#endregion
}
catch (Exception ex)
{
Log.Logger.Error($"Error registering message handler. Error: {ex}");
}
}
知道这里可能出了什么问题吗?在生产环境中,我们有超过 100 台服务器在消费消息。这些服务器位于全国各地。 RabbitMQ 服务器在数据中心。因此,如果连接丢失,消费服务器必须恢复,否则无法使用!
最佳答案
只需在连接事件上添加订阅逻辑(它将在每个代理连接事件上触发)。
advancedBus.Connected += (sender, args) => {
// subscribe logic
};
当然你应该处理好不要在任何连接等上调用订阅逻辑两次
关于rabbitmq - RabbitMQ 服务器无法访问后,EasyNetQ 不会重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54316179/
我使用 EasyNetQ 为 RabbitMQ 实现了一个简单的客户端。连接后,我收到通知“队列模型关闭”。这是我的代码: var _bus = RabbitHutch.CreateBus(Strin
我使用 EasyNetQ 为 RabbitMQ 实现了一个简单的客户端。连接后,我收到通知“队列模型关闭”。这是我的代码: var _bus = RabbitHutch.CreateBus(Strin
我正在考虑使用 EasyNetQ 与 RabbitMQ 进行交互,想知道它是否可以支持以下情况: 使用一些任意参数(例如 x-message-ttl)在外部声明队列 使用 EasyNetQ 的客户端代
在我的测试应用程序中,我可以看到处理异常的消息被自动插入到默认的 EasyNetQ_Default_Error_Queue 中,这很棒。然后我可以使用 Hosepipe 成功地转储或重新排队这些消息。
我正在使用EasyNetQ,需要重试原始队列中的失败消息。问题是:即使我成功地增加了TriedCount变量(在每个msg的正文中),当EasyNetQ在异常发生后将消息发布到默认错误队列时,更新的T
我目前正在调查此问题,但我还是想问问。如果没有回答,我会在发现后发布答案。 问题如下: 应用程序调用 RabbitHutch.CreateBus 来创建 IBus/IAdvancedBus 的实例,以
我们使用 RabbitMQ 在 C# .Net(EasyNetQ 客户端)中对消息进行排队。 我想要一个消费者应用程序(C# 控制台应用程序)监听一个队列并为每种消息类型提供多个处理程序。 我实现了这
当发生错误消息发布到默认 EasyNetQ_Default_Error_Queue 时,我有四个消费者是否可以让每个队列消费者编写自己的错误交换 例如; Queue Name : A Error
我正在我的软件中尝试一些边缘情况。所以我创建了一个非常简单的测试环境: 在 CentOS 7 上运行的 RabbitMQ 服务器 在 CentOS 7 下运行的 C# 中针对 .NETCore 2.1
我最近发布了以下问题... Custom Error Queue Name when using EasyNetQ for RabbitMQ? ... 并提供了有关如何在使用 EasyNetQ 时使用
我在 C# 中使用 RabbitMQ 和 EasyNetQ 库。我在这里使用发布/订阅模式。我还有一些问题希望有人能帮助我解决: 如果在使用消息时出现错误,它会自动移至错误队列。我如何实现重试(以便它
当发布者希望消息得到答复时,如何确保在您将其扩展时它只会获得(对自己的消息)相关的答复? 我们有一个客户端进程发布一条消息供服务器进程回答。此外,我们有一个“监听器”进程,它只需要在不发布任何内容的情
我正在使用 EasyNetQ,我想知道如何获取现有的 IExchange和 IQueue不提前订阅? 在 IAdvanceBus我只能看到: ExchangeDeclareAsync QueueDec
我是消息传递新手,目前正在使用 RabbitMQ 进行调查作为我们系统架构的一部分,在不同服务之间提供消息传递。我有一个basic RabbitMQ example工作并且它可以通过总线传输基本的文本
我正在尝试使用 EasyNetQ 连接到 RabbitMQ。RabbitMQ 位于远程虚拟机上。 _rabbitBus = RabbitHutch.CreateBus( string.Form
所以我有以下架构:Angular SPA(单页应用程序)执行对 .NET Web API Controller 的调用,后者将消息发布到 Publisher EasyNetQ 窗口服务,后者将异步请求
谁能指导我找到适用于 Unity 5.x (.Net 3.5) 项目的 EasyNetQ 的正确版本? 我有一个针对 .Net framework 3.5(Unity 3.5 .net 子集基类库)的
与其让我未处理的异常进入EasyNetQ_Default_Error_Queue,我想知道是否有一种方法可以明确说明应该用于给定应用程序的错误队列的名称,因此错误不会t ALL 最终都在这个 Easy
我正在尝试将 EasyNetQ 与 Ninject 结合使用来记录消息。 我已经设法将 Ninject 设置为 EasyNetQ DI(我认为),但是当消息到达没有无参数构造函数的处理程序时(例如,我
我可以使用以下代码成功发布消息: using (IAdvancedBus bus = RabbitHutch.CreateBus("host=192.168.153.128:5672;
我是一名优秀的程序员,十分优秀!