gpt4 book ai didi

php - RabbitMQ 实现

转载 作者:可可西里 更新时间:2023-11-01 00:47:34 26 4
gpt4 key购买 nike

这是我目前的一些设置。

  • 用于将 (POST) 数据插入队列的 REST API
  • Queue 有一个始终运行的 Consumer 和 Produces to en Exchange
  • Exchange 路由到其他几个队列(比如 20+)
  • 每个 ( 20+ ) 队列执行特定任务(消费者也总是运行)
  • Cron 作业运行以检查所有(20 多个)任务是否已完成并生成到另一个队列

我不确定我是否喜欢一直运行的消费者,因为每个消费者使用大约 300MB 的 Ram(我认为它是 MB,目前不在我面前)并且我正在寻找另一种实现方式。

    M <-- Message coming from REST API
|
|
+-First Queue
|
|
| <-- The Exchange
/|\
/ | \
/ | \ <-- bind to multiple queues ( 20+ )
Q1 Q2 Q3 <-- Each Queue is a task that must be completed


| <-- CRON runs to check if all queues above have completed
|
|
Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
|
C <-- Consumer

我在下面的相关问题中建议使用 RPC,但问题是 RPC(据我了解)将有多个实例。这是一个资源密集型过程,我认为通过添加 RPC 调用只会使服务器陷入困境,然后变得无响应(如果我错了请纠正我)。

另一种方法是使用聚合器模式

这看起来正是我需要的,但我发现文档有限。有人做过这个图案吗?

我的问题是我对目前的实现方式不满意,我正在寻找改进流程的方法。我希望摆脱 CRON,实现新模式,而不是让消费者一直运行。

该进程目前也只支持每个消费者的单个实例。它可以有多个消费者,但我们当时只想要一个。

这是使用 RabbitMQBundle 在 PHP、Symfony2 框架中实现的

相关问题:

最佳答案

这里是 OldSound,RabbitMQ Bundle 的创建者。

bundle 本身不支持开箱即用的聚合器模式,但您可以使用底层的 php-amqplib 实现它。

要执行聚合,您需要发布具有关联 ID 的消息和沿着处理链标识的线程。然后聚合器将等待 X 数量的消息,根据您必须处理该特定任务的不同工作人员的数量。等待消息的一种方法是拥有一个数组,您可以在它们按相关 ID 索引时保留它们。

因此,无论何时收到传入消息,您都将执行以下操作:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

然后你在某个地方做:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

我现在实际上正在为 Symfony 开发一个工作流包,我计划很快将其开源。该 bundle 可用于以非常简单的方式实现您提出的用例(即,您只需要为每个任务提供服务)。

现在我想知道为什么每个消费者都占用 300 MB 的 RAM?你需要和他们一起运行完整的堆栈框架吗?如果可能,为消费者应用程序创建一个新的 Symfony 内核,并只加载您需要的内容以减少开销。

关于php - RabbitMQ 实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14594569/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com