gpt4 book ai didi

multithreading - RabbitMQ 消费者中的并发消息处理

转载 作者:行者123 更新时间:2023-12-02 03:23:55 28 4
gpt4 key购买 nike

我是 RabbitMQ 的新手,所以如果我的问题听起来微不足道,请原谅我。我想在 RabbitMQ 上发布消息,该消息将由 RabbitMQ 消费者处理。

我的消费者计算机是多核计算机(最好是 azure 上的辅助角色)。但 QueueBasicConsumer 一次推送一条消息。我如何编程以利用可以同时处理多条消息的所有核心。

  1. 一种解决方案可能是在多个线程中打开多个 channel ,然后在那里处理消息。但在这种情况下我将如何决定线程数。
  2. 另一种方法可能是在主线程上读取消息,然后创建任务并将消息传递给该任务。在这种情况下,如果有许多消息(在阈值之后)已经在处理中,我将不得不停止消费消息。不确定如何实现。

提前致谢

最佳答案

您的第二个选项听起来更合理 - 在单个 channel 上消费并生成多个任务来处理消息。要实现并发控制,您可以使用 semaphore控制飞行中的任务数量。在启动任务之前,您将等待信号量变得可用,任务完成后,它会向信号量发出信号以允许其他任务运行。

您还没有指定您选择的语言/技术堆栈,但无论您做什么 - 尝试使用线程池而不是自己创建和管理线程。在 .NET 中,这意味着使用 Task.Run异步处理消息。

C# 代码示例:

using (var semaphore = new SemaphoreSlim(MaxMessages))
{
while (true)
{
var args = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
semaphore.Wait();
Task.Run(() => ProcessMessage(args))
.ContinueWith(() => semaphore.Release());
}
}

您可能会发现在 channel 上启用显式 ACK 控制并使用 RabbitMQ Consumer Prefetch 更容易,而不是自己控制并发级别。设置未确认消息的最大数量。这样,您一次就不会收到多于您想要的消息。

关于multithreading - RabbitMQ 消费者中的并发消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31408734/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com