gpt4 book ai didi

c# - 批量消费消息——RabbitMQ

转载 作者:太空狗 更新时间:2023-10-29 21:43:32 25 4
gpt4 key购买 nike

我能够使用上面的代码使用不同的路由 key 使用多个生产者发送到同一个交换的多条消息,并且能够将每条消息插入数据库。

但这会消耗太多资源,因为消息将一个接一个地插入到数据库中。所以我决定进行批量插入,我发现我可以设置 BasicQos

在 BasicQos 中将消息限制设置为 10 后,我的期望是 Console.WriteLine 必须写入 10 条消息,但实际情况并非如此。

我的期望是从队列中消费 N 条消息并进行批量插入并成功发送 ACK 否则没有 ACK

这是我使用的一段代码。

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");

channel.BasicQos(0, 10, false);

var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);

consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);

// Insert into Database

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" Recevier Ack " + ea.DeliveryTag);
}
catch (Exception e)
{
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine(" Recevier No Ack " + ea.DeliveryTag);
}
};

Console.ReadLine();
}
}

最佳答案

BasicQos = 10 意味着客户端一次只获取 10 条消息,但是当您使用它时,您每次总是会看到一条消息。在这里阅读:https://www.rabbitmq.com/consumer-prefetch.html

AMQP specifies the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").

对于您的范围,您必须下载消息,将其放入临时列表中,然后插入到数据库中。

然后你可以使用:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

void basicAck()

Parameters: deliveryTag - the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver

multiple - true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.

示例

final List<String> myMessagges = new ArrayList<String>();
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
myMessagges.add(new String(body));
System.out.println("Received...");

if (myMessagges.size() >= 10) {
System.out.println("insert into DB...");
channel.basicAck(envelope.getDeliveryTag(), true);
myMessagges.clear();
}


}
});

关于c# - 批量消费消息——RabbitMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38844610/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com