gpt4 book ai didi

php - RabbitMQ 交换存储消息

转载 作者:行者123 更新时间:2023-12-05 05:11:41 26 4
gpt4 key购买 nike

我正在尝试弄清楚即使没有消费者在运行,是否也可以在 RabbitMQ 交换中存储消息。

我理解(可能不正确)要实现交换需要“持久”以及队列和消息需要使用“持久”标志发送

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

我的主要目标是将所有消息存储在交换器中,这样,如果由于任何原因没有消费者在运行,当我启动一个时,交换器中的所有消息都可以定向到绑定(bind)队列。我声明我的交换和队列如下:

//Sender.php
public function sendToQueue(ActionMessage $message)
{
$headers = [
'content-type' => 'application/json',
'timestamp' => $message->getCreatedAt()->getTimestamp(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel = $this->connection->getChannel();
$channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
$channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
return true;
}
//Receiver.php
public function consume($callbackFunction)
{
$channel = $this->messenger->getChannel();
$channel->exchange_declare($this->exchange, 'direct', false, true, false);
list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
$channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

$channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->messenger->close();
}

我将不胜感激任何帮助(即使只是放弃这个想法并在两者之间插入一些存储空间)。谢谢。

最佳答案

交换器不存储消息,这是队列的工作。您遇到的问题不是没有消费者 在运行,而是没有队列 存在,因为您让消费者声明他们自己的队列。

如果您希望消息在消费者接收之前一直存在,您应该声明:

  • “发送方”将发布到的交易所
  • 附加到该交换器的命名队列,用于将单独使用的每种类型的消息(如果使用直接交换器,则每个路由键一个)

这些都可以在 Sender 脚本中声明,但在大多数情况下,在部署应用程序时声明一次更有意义,就像对待数据库模式一样对待它们。

无需在 Receiver 脚本中创建匿名队列,您可以只附加到命名队列,并开始接收等待在那里的消息。

这将产生的主要区别在于同一路由 key 的多个消费者将如何交互:

  • 多个队列附加到单个交换器,如在您现有的代码中,创建每条消息的多个副本。如果您有不同的消费者使用相同的消息执行不同的操作,这将非常有用。
  • 正如我在上面所建议的那样,连接到单个队列的多个消费者将共享消息,每个消息基本上由不同的消费者随机处理。如果您有多个相同的消费者来处理大量消息,这很有用。

您可能会找到 this RabbitMQ simulator有助于可视化差异。

您可能会发现您实际上想要混合:

  • 为每个必须查看每条消息的消费者预先声明一个队列,以确保存储每条消息的副本,直到该特定消费者准备好阅读它。
  • 在额外的消费者中声明额外的临时队列,以便在消息传入时获取一份额外的副本

最后一点,RabbitMQ 中有两种机制可以回退到不同处理无法处理的消息:

  • Alternate Exchange捕获将从交换中丢弃的消息(因为没有适当的队列绑定(bind))。
  • A Dead Letter Exchange捕获将从队列中丢弃的消息(例如,因为它被消费者拒绝,或达到配置的超时)。

如果您实际上不想正常处理丢失的消息,您只是想检测它们,例如,AE 在您的示例中可能会有用。在错误日志中列出它们。

关于php - RabbitMQ 交换存储消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55297823/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com