gpt4 book ai didi

php - 使用来自 RabbitMq 的不确认消息

转载 作者:IT王子 更新时间:2023-10-29 00:12:40 28 4
gpt4 key购买 nike

我创建了一个简单的发布者和一个使用 basic.consume 在队列上订阅的消费者。

当作业无异常运行时,我的消费者会确认消息。每当我遇到异常时,我都不会确认消息并提前返回。只有已确认的消息才会从队列中消失,因此工作正常。
现在我希望消费者再次接收失败的消息,但重新消费这些消息的唯一方法是重新启动消费者。

我需要如何处理这个用例?

设置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}

生产者代码

$exchange->publish('message');

最佳答案

如果消息未被确认且应用程序失败,它将自动重新传送并且redelivered信封上的属性将设置为 true (除非您使用 no-ack = true 标志消费它们)。

更新:

你必须 nack在你的 catch block 中带有重新传递标志的消息

    try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}

当心无限无效的消息,而在 RabbitMQ 和 AMQP 协议(protocol)中根本没有实现重新传递计数。

如果您不想弄乱这些消息而只想添加一些延迟,您可能需要添加一些 sleep()usleep()之前 nack方法调用,但这根本不是一个好主意。

有多种技术可以处理循环重投问题:

<强>1。靠Dead Letter Exchanges

  • 优点:可靠、标准、清晰
  • 缺点:需要额外的逻辑

<强>2。使用 per message or per queue TTL

  • 优点:易于实现、标准、清晰
  • 缺点:排长队可能会丢失一些消息

示例(请注意,对于队列 ttl,我们只传递数字,而对于消息 ttl - 任何数字字符串):

2.1 每条消息 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);

2.2。每个队列 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

<强>3。在消息正文或 header 中保留重新传送计数或剩余重新传送数(又名跳数限制或 IP 堆栈中的 ttl)

  • 优点:让您在应用程序级别上对消息生命周期进行额外控制
  • 缺点:当您必须修改消息并再次发布时,开销很大,特定于应用程序,不明确

代码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);

$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;

try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic

if ($headers['ttl'] > 0) {
$headers['ttl']--;

$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}

return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}

}

return $queue->ack($msg->getDeliveryTag());
}
);

可能还有一些其他方法可以更好地控制消息重新传送流。

结论:没有银弹解决方案。您必须决定哪种解决方案最适合您的需求或找出其他解决方案,但不要忘记在这里分享它;)

关于php - 使用来自 RabbitMq 的不确认消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17654475/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com