gpt4 book ai didi

rabbitmq - 自动化 RabbitMQ 消费者测试

转载 作者:行者123 更新时间:2023-12-04 00:58:02 31 4
gpt4 key购买 nike

我有一个使用RabbitMQ客户端接收消息的.net微服务,我需要测试以下内容:

1-消费者成功连接到rabbitMq主机。

2-消费者正在收听队列。

3-消费者成功接收消息。

为了实现上述目标,我创建了一个发送消息的示例应用程序,我正在调试消费者以确保它正在接收消息。

我怎样才能自动化这个测试?因此将它包含在我的微服务 CI 中。

我正在考虑将我的示例应用程序包含在我的 CI 中,以便我可以触发一条消息,然后运行一个消费者单元测试,该测试等待特定时间,如果收到消息则通过,但这对我来说似乎是错误的做法,因为测试不会开始直到几秒钟消息被触发。

我想到的另一种方法是从单元测试本身触发示例应用程序,但如果示例应用程序无法工作,则会导致服务故障。

是否有通过 RabbitMQ 连接的微服务集成测试的最佳实践?

最佳答案

我已经建立了很多这样的测试。我已经抛出了一些基本的代码
Github here with .NET Core 2.0.

您将需要一个 RabbitMQ 集群来进行这些自动化测试。每个测试都从消除队列开始,以确保不存在任何消息。来自另一个测试的预先存在的消息将破坏当前的测试。

我有一个简单的助手来删除队列。在我的应用程序中,他们总是声明自己的队列,但如果不是你的情况,那么你将不得不再次创建队列并绑定(bind)到任何交换。

public class QueueDestroyer
{
public static void DeleteQueue(string queueName, string virtualHost)
{
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "localhost";
connectionFactory.UserName = "guest";
connectionFactory.Password = "guest";
connectionFactory.VirtualHost = virtualHost;
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDelete(queueName);
connection.Close();
}
}

我创建了一个非常简单的消费者示例来表示您的微服务。它在任务中运行,直到取消。
public class Consumer
{
private IMessageProcessor _messageProcessor;
private Task _consumerTask;

public Consumer(IMessageProcessor messageProcessor)
{
_messageProcessor = messageProcessor;
}

public void Consume(CancellationToken token, string queueName)
{
_consumerTask = Task.Run(() =>
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
_messageProcessor.ProcessMessage(message);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);

while (!token.IsCancellationRequested)
Thread.Sleep(1000);
}
}
});
}

public void WaitForCompletion()
{
_consumerTask.Wait();
}

}

消费者有一个 IMessageProcessor 接口(interface),它将完成处理消息的工作。在我的集成测试中,我创建了一个假的。您可能会为此使用首选的模拟框架。

测试发布者向队列发布消息。
public class TestPublisher
{
public void Publish(string queueName, string message)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName="guest", Password="guest" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
}
}
}

我的示例测试如下所示:
[Fact]
public void If_SendMessageToQueue_ThenConsumerReceiv4es()
{
// ARRANGE
QueueDestroyer.DeleteQueue("queueX", "/");
var cts = new CancellationTokenSource();
var fake = new FakeProcessor();
var myMicroService = new Consumer(fake);

// ACT
myMicroService.Consume(cts.Token, "queueX");

var producer = new TestPublisher();
producer.Publish("queueX", "hello");

Thread.Sleep(1000); // make sure the consumer will have received the message
cts.Cancel();

// ASSERT
Assert.Equal(1, fake.Messages.Count);
Assert.Equal("hello", fake.Messages[0]);
}

我的假货是这样的:
public class FakeProcessor : IMessageProcessor
{
public List<string> Messages { get; set; }

public FakeProcessor()
{
Messages = new List<string>();
}

public void ProcessMessage(string message)
{
Messages.Add(message);
}
}

额外的建议是:
  • 如果您可以在每次测试运行时将随机文本附加到您的队列并交换名称,那么请这样做以避免并发测试相互干扰
  • 如果您的应用程序不这样做,我在代码中还有一些用于声明队列、交换和绑定(bind)的助手。
  • 编写一个连接杀手类,该类将强制关闭连接并检查您的应用程序是否仍然有效并且可以恢复。我有这方面的代码,但没有在 .NET Core 中。只要问我,我就可以修改它以在 .NET Core 中运行。
  • 一般来说,我认为您应该避免在集成测试中包含其他微服务。例如,如果您从一个服务向另一个服务发送消息并期望返回消息,那么创建一个可以模拟预期行为的假消费者。如果您收到来自其他服务的消息,则在您的集成测试项目中创建假发布者。
  • 关于rabbitmq - 自动化 RabbitMQ 消费者测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50176793/

    31 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com