gpt4 book ai didi

c# - RabbitMQ 中的并发

转载 作者:太空宇宙 更新时间:2023-11-03 12:37:33 26 4
gpt4 key购买 nike

经过一周的编码和搜索论坛,似乎是时候问...

我有一个 C# 应用程序,它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息。我想同时处理多条消息,所以我在同一个连接上实例化了几个 channel (在本例中为 8 个),每个 channel 都有一个消费者。然后我将一个事件处理程序附加到每个消费者的 Received 事件。根据我目前的所有阅读,此设置应该允许事件处理程序由消费者并发触发,每个消费者都在自己的线程中运行。但在我的例子中,消费者仅在前一个消费者确认其消息后才按顺序接收消息。

有没有其他人经历过这种行为?我的理解是否正确,在这种情况下处理在技术上应该是并发的?

下面是一个基本代码,可以更好地说明问题:

Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}

我希望看到的是://并发处理

Consumer 1: processing started...

Consumer 2: processing started...

Consumer 3: processing started...

...

Consumer 6: processing ended.

Consumer 7: processing ended.

Consumer 8: processing ended.

但我得到的是://顺序处理

Consumer 1: processing started...

Consumer 1: processing ended.

Consumer 2: processing started...

Consumer 2: processing ended.

...

Consumer 8: processing started...

Consumer 8: processing ended.

如有任何关于如何进行的想法,我们将不胜感激。

最佳答案

您实际上可以在创建 ConnectionFactory 时设置并行处理任务的数量!

ConnectionFactory factory = new ConnectionFactory
{
ConsumerDispatchConcurrency = 2,
};

默认值为1,即serial/sequential processing。

我通过剖析 .NET client's source code 发现了这一点.这是有趣的部分(concurrency 是从 ConsumerDispatchConcurrency 设置的):

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}

但请注意,这可能会导致竞争条件!该属性有此备注:

For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. In addition to that consumers need to be thread/concurrency safe.

关于c# - RabbitMQ 中的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40338774/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com