gpt4 book ai didi

c# - RabbitMQ 持久队列不工作(RPC-Server,RPC-Client)

转载 作者:可可西里 更新时间:2023-11-01 08:47:12 25 4
gpt4 key购买 nike

我想知道为什么我的 RabbitMQ RPC-Client 在重启后总是处理死消息。 _channel.QueueDeclare(queue, false, false, false, null); 应该禁用缓冲区。如果我在 RPC 客户端中重载 QueueDeclare,我将无法连接到服务器。这里有什么问题吗?知道如何解决这个问题吗?


RPC-服务器

new Thread(() =>
{
var factory = new ConnectionFactory { HostName = _hostname };
if (_port > 0)
factory.Port = _port;
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();

_channel.QueueDeclare(queue, false, false, false, null);
_channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume(queue, false, consumer);
IsRunning = true;
while (IsRunning)
{
BasicDeliverEventArgs ea;
try {
ea = consumer.Queue.Dequeue();
}
catch (Exception ex) {
IsRunning = false;
}
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;

var xmlRequest = Encoding.UTF8.GetString(body);

var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message;
var messageResponse = handler(messageRequest);

_channel.BasicPublish("", props.ReplyTo, replyProps,
messageResponse);
_channel.BasicAck(ea.DeliveryTag, false);
}
}).Start();

RPC 客户端

public void Start()
{
if (IsRunning)
return;
var factory = new ConnectionFactory {
HostName = _hostname,
Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint)
: new AmqpTcpEndpoint(_endpoint, _port)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_replyQueueName = _channel.QueueDeclare(); // Do not connect any more
_consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume(_replyQueueName, true, _consumer);
IsRunning = true;
}

public Message Call(Message message)
{
if (!IsRunning)
throw new Exception("Connection is not open.");
var corrId = Guid.NewGuid().ToString().Replace("-", "");
var props = _channel.CreateBasicProperties();
props.ReplyTo = _replyQueueName;
props.CorrelationId = corrId;

if (!String.IsNullOrEmpty(_application))
props.AppId = _application;

message.InitializeProperties(_hostname, _nodeId, _uniqueId, props);

var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message));
_channel.BasicPublish("", _queue, props, messageBytes);

try
{
while (IsRunning)
{
var ea = _consumer.Queue.Dequeue();
if (ea.BasicProperties.CorrelationId == corrId)
{
var xmlResponse = Encoding.UTF8.GetString(ea.Body);
try
{
return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message;
}
catch(Exception ex)
{
IsRunning = false;
return null;
}
}
}
}
catch (EndOfStreamException ex)
{
IsRunning = false;
return null;
}
return null;
}

最佳答案

尝试在您的 RPC 客户端代码中将 DeliveryMode 属性设置为非持久性 (1),如下所示:

public Message Call(Message message)
{
...
var props = _channel.CreateBasicProperties();
props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well
...
}

AMQP Model Explained包含非常有用的资源,例如解释如何处理最终进入死信队列的消息。

文档中关于队列持久性的另一个有用说明:

Durable queues are persisted to disk and thus survive broker restarts. Queues that are not durable are called transient. Not all scenarios and use cases mandate queues to be durable.

Durability of a queue does not make messages that are routed to that queue durable. If broker is taken down and then brought back up, durable queue will be re-declared during broker startup, however, only persistent messages will be recovered.

请注意,它谈论的是代理重启,而不是发布者或消费者重启。

关于c# - RabbitMQ 持久队列不工作(RPC-Server,RPC-Client),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31369854/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com