- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我想知道如何使用 Amqpphplib 进行延迟。
我使用了这个很棒的 CoffeeScript 教程:
https://github.com/jamescarr/rabbitmq-scheduled-delivery
但它似乎不适用于 PHP-amqplib。
消息如我所愿地过期了,但似乎“x-dead-letter-exchange”不起作用。我使用了 RabbitMQ 管理控制台,我实时看到了所有队列的创建和删除。但是我的消息在过期后确实进入了即时队列。我用的是RabbitMQ 3.2.3版本,PHP-amqplib 2.2.*版本。
这是我的代码:
连接类:
class Connection
{
/**
* @var $ch
*/
public $ch;
/**
* @var $consumer_tag
*/
public $consumer_tag;
/**
* @var $exchange
*/
public $exchange;
/**
* @var $conn
*/
public $conn;
public function __construct($host, $port, $user, $password, $vhost)
{
$this->exchange = 'immediate';
$this->queue = 'right.now.queue';
$this->consumer_tag = 'consumer';
$this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
$this->ch = $this->conn->channel();
$this->ch->exchange_declare($this->exchange, 'direct', false, true, false);
$this->ch->queue_declare($this->queue, false, true, false, false, false);
$this->ch->queue_bind($this->queue, $this->exchange);
}
public function createDelayedQueue ($name, $delay_seconds) {
$this->ch->queue_declare($name, false, false, false, true, true, array(
"x-dead-letter-exchange" => array("S", $this->exchange),
"x-message-ttl" => array("I", $delay_seconds*1000),
"x-expires" => array("I", $delay_seconds*1000+1000)
));
}
}
发布代码
$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);
消费者代码
$amqp = $this->getContainer()->get('amqp_connexion');
$amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {
echo $msg->body;
echo "\n--------\n";
});
$output->writeln('Listening '.$amqp->queue.'...');
// Loop as long as the channel has callbacks registered
while (count($amqp->ch->callbacks)) {
$amqp->ch->wait();
}
最佳答案
我刚刚为 php 编写了一个简化的工作版本:
/////// simplified ///////
// include the AMQPlib Classes || use an autoloader
// queue/exchange names
$queueRightNow = 'right.now.queue';
$exchangeRightNow = 'right.now.exchange';
$queueDelayed5sec = 'delayed.five.seconds.queue';
$exchangeDelayed5sec = 'delayed.five.seconds.exchange';
$delay = 5; // delay in seconds
// create connection
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');
// create a channel
$channel = $AMQPConnection->channel();
// create the right.now.queue, the exchange for that queue and bind them together
$channel->queue_declare($queueRightNow);
$channel->exchange_declare($exchangeRightNow, 'direct');
$channel->queue_bind($queueRightNow, $exchangeRightNow);
// now create the delayed queue and the exchange
$channel->queue_declare(
$queueDelayed5sec,
false,
false,
false,
true,
true,
array(
'x-message-ttl' => array('I', $delay*1000), // delay in seconds to milliseconds
"x-expires" => array("I", $delay*1000+1000),
'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
)
);
$channel->exchange_declare($exchangeDelayed5sec, 'direct');
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);
// now create a message und publish it to the delayed exchange
$msg = new \PhpAmqpLib\Message\AMQPMessage(
time(),
array(
'delivery_mode' => 2
)
);
$channel->basic_publish($msg,$exchangeDelayed5sec);
// consume the delayed message
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
$messagePublishedAt = $msg->body;
echo 'seconds between publishing and consuming: '
. (time()-$messagePublishedAt) . PHP_EOL;
};
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);
// start consuming
while (count($channel->callbacks) > 0) {
$channel->wait();
}
关于php - 如何延迟? - php-amqplib,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21942063/
使用 Node JS 和 RabbitMQ 的 amqplib 模块, 1) 有没有办法知道队列中有多少订阅者? 2)如何使每个队列都应该有一个消费者 key ? 谢谢! 最佳答案 AMQP 的 ba
我正在尝试在与 rabbitmq 队列服务器的连接失败时实现重新连接机制。此代码仅用于消费消息,以下是我的代码( channel Init 函数负责初始化消费者并绑定(bind)到队列)。 conne
我是 AMQP/RabbitMQ 新手,也是相对 Node.js 新手。我可以在客户端使用 amqplib NPM 库吗? 我希望能够从我的 Angular 应用程序将消息直接推送到 RabbitMQ
我想知道如何使用 Amqpphplib 进行延迟。 我使用了这个很棒的 CoffeeScript 教程: https://github.com/jamescarr/rabbitmq-scheduled
我正在用 express 编写一个服务注入(inject)器,以接收请求并将它们传递给 Rabbit MQ。 我似乎无法优雅地解决 checkQueue() 错误。每次我点击它们时,我的整个应用程序都
我正在使用 nodejs 的 amqplib 库与 RabbitMQ 一起工作。我正在尝试使用函数 checkQueue 来检查队列是否存在: mychannel.checkQueue('xxx',
我需要为我的连接设置一个友好名称,如下所示,而不是“?”在 RabbitMQ for amqplib for NodeJS: 我找到了 Java 和 Python 的示例,但还没有找到这个库的示例。谢
我正在使用 amqplib在我的 node.js 服务器中传输消息。我看到了一个来自 RabbitMQ official website 的例子: var amqp = require('amqpli
我正在使用 py-amqplib 在 Python 中访问 RabbitMQ。应用程序会不时收到监听某些 MQ 主题的请求。 第一次收到这样的请求时,它会创建一个 AMQP 连接和一个 channel
我正在使用amqlib模块使用rabbitmq和nodejs构建后台任务管理系统。 有些任务确实非常消耗 CPU,因此,如果我启动大量任务,而只有少数工作进程在运行,我的服务器可能会被终止(使用过多的
我正在运行一个由 2 个 RabbitMQ 服务器(可以是任意数量)组成的集群,并且我已经实现了故障转移,其中我的应用程序循环 RabbitMQ 列表并在连接断开时尝试重新连接。 如果我尝试连接的 R
我正在尝试使用 php-amqplib 发送和接收消息。它可以在终端上发送/接收。但是当使用网络浏览器时,无法从队列中接收它不断等待消息。我在 receive.php 中使用了以下代码 require
我正在使用rabbitMq node-amqplib库编写生产者和消费者,我担心突然失去服务器连接,如何检查连接是否有效? 最佳答案 AMQP 0-9-1 提供心跳功能,以确保应用程序层及时发现中断的
设置:Centos 6.2、RabbitMQ 3.1.3、PHP 5.4.3 我正在尝试从 php-amqlib 运行演示脚本, 具体来说amqp_consumer_non_blocking.php
我刚刚开始使用 php-amqplib 和 RabbitMQ,并且想要一种方法来处理由于某种原因而无法处理且被拒绝的消息。我认为人们处理这个问题的一种方式是使用死信队列。我正在尝试设置它,但到目前为止
我有一个 PHP 脚本中的消费者 worker 。 但有时 RabbitMQ 服务器会停止运行, 我收到此错误: PHP Fatal error: Uncaught exception 'Error
我有一个 PHP 脚本中的消费者 worker 。 但有时 RabbitMQ 服务器会停止运行, 我收到此错误: PHP Fatal error: Uncaught exception 'Error
我想使用nodeJs通过amqplib断言惰性模式下的队列存在。 我知道我可以通过创建策略来做到这一点,但我不想这样做,因为目前在我们现有的环境中很难实现自动化。 我尝试过以下方法:
我正在编写一个模块,充当 amqplib 的包装器。动机是,我们已经设置了一个现有的且定义良好的交换/队列/绑定(bind),我只想公开使用方法以允许使用传入数据。 为此,我的模块采用 callbac
我正在编写一个使用 amqplib 的 Channel#consume 方法的 worker。我希望这个工作人员等待作业并在它们出现在队列中时立即处理它们。 我编写了自己的模块来抽象出 ampqlib
我是一名优秀的程序员,十分优秀!