gpt4 book ai didi

php - RabbitMQ 和 PHP 如何将任务返回到队列?

转载 作者:行者123 更新时间:2023-12-04 13:23:07 24 4
gpt4 key购买 nike

如果处理结果不适合我,我如何将消息返回到队列中。仅找到有关消息确认的信息,但我认为它不适合我。如果作为处理的结果,我需要将参数 RETRY 消息添加回队列。然后这个 worker 或另一个 worker 再次拿起它并尝试处理它。

例如:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
$condition = json_decode($msg->body);

if (!$condition) {
# return to the queue
}
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();
?>

最佳答案

将自动 no_ack 标志设置为 false

queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback

$channel->basic_consume('test', '', false, false, false, false, $callback);

你必须使用确认,如果你的进程不工作你可以忽略确认

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($message) {
$condition = json_decode($message->body);

if (!$condition) {
// return to the queue
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}else{
// send ack , remove from queue
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
};

$channel->basic_consume('test', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

当然,使用这种方法,您将面对始终位于队列头部的消息,还有另一种可能性,如果你真的想要重试的轨迹,你可以按照下面的方法

为重试定义一个队列,最好是你的队列名 -retry 并且最好定义一个死信队列:-dlq

然后你可以做如下的事情:如何设置-retry队列:这是其中最重要的部分。您需要声明具有以下功能的队列:

x-dead-letter-exchange: 应该和你的主队列路由键相同
x-dead-letter-routing-key: 应该和你的主队列路由键相同
x-message-ttl:重试之间的延迟

代码是sudo代码,请不要复制粘贴,这只是给你一个提示

$maximumRetry = 5;
$callback = function($message) {
$body = json_decode($message->body);
try {
// process result is your condition
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} catch(Exception $e) {
// return to the queue
$body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1
if ($body['try_attempt'] >= $maximumRetry ){
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
return
}
$msg = new AMQPMessage(json_encode($message));

$channel->basic_publish($msg, '', 'test-retry');
}
};

我们需要 3 个队列来重新绑定(bind)。

  • queue.example

    • 绑定(bind):
      • 交换:queue.exchange
      • 路由:queue.example
    • 特点:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-dlq
  • queue.example-dlq

    • 绑定(bind):
      • 交换:queue.exchange
      • 路由:queue.example-dlq
  • queue.example-retry

    • 绑定(bind):
      • 交换:queue.exchange
      • 路由:queue.example-retry
    • 特点:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-added
      • x-消息-ttl:10000

------------更新------------

Quorum 队列提供开箱即用的功能,因此在消费者中,您可以了解每条消息被重试的次数,您还可以轻松地为其定义一个死信队列,有关更多信息,您可以阅读有关 quorom queues 的更多信息和毒药 message handling

关于php - RabbitMQ 和 PHP 如何将任务返回到队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46270322/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com