gpt4 book ai didi

macos - PHP 和 AMQP RabbitMQ 消费者

转载 作者:搜寻专家 更新时间:2023-10-31 22:12:19 27 4
gpt4 key购买 nike

我有一个有效的 PHP 脚本,它向 RabbitMQ 发布消息,它从制表符分隔的文本文件中解析出来。我从字面上将工作代码从该文件复制/粘贴到另一个文件,并想建立一个消费者来检索发布到交换的消息,对它们进行 json_decode 并将它们插入数据库。

每次尝试,甚至从 PHP.net 站点复制/粘贴示例代码,甚至 SO 中的示例,都会以空白屏幕和无错误消息失败,然后它甚至会终止 php-fpm 进程。

知道为什么队列不会绑定(bind)以及这里出了什么问题吗?

  • Nginx -> php-fpm
  • PHP 5.3.x
  • Macbook Pro(OSX Lion)
  • RabbitMQ(安装了 librabbitmq 和 pecl amqp)

这是我尝试过的一个示例,但我已经在 AMQP 文档上尝试了 PHP.net 和 SO 示例,但都没有用。我可以很好地发布,但是当我尝试绑定(bind)队列时失败,最终 php-fpm 锁定。

<?php
// Report all PHP errors
error_reporting(E_ALL);

/*****************************************
* MQ settings
****************************************/
$mq = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'exchange' => 'gbus.user',
'routing_key' => 'gbus.test.mike',
);

/*****************************************
* Connect to queue
****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('test-queue');
$q->bind($mq['exchange'],$mq['routing_key']);

?>
<br>
<font color="blue" face="arial" size="4">File Contents</font>
<hr>
<?php
while(true){
$msg=$q->get();
if ($msg['count']>-1){
echo "\n--------\n";
print_r($msg['msg']);
echo "\n--------\n";
}
sleep(1);
}
if (!$conn->disconnect()) {
throw new Exception('Could not disconnect');
}
?>

这是我用来发布到队列的示例,每次运行它时,我都会在 RabbitMQ 控制面板中查看 20 条新消息。我将测试限制为 20,但文件有数万行。

工作发布代码:

<?php
/*****************************************
* MQ settings
****************************************/
$mq = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'exchange' => 'gbus.user',
'routing_key' => 'gbus.test.mike',
);

/*****************************************
* Connect to queue
****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

$ex = new AMQPExchange($ch);
$ex->setName($mq['exchange']);


/*****************************************
* Parse the file
****************************************/
$filename = "/tmp/Users.txt";
$board = "test";

$fd = fopen ($filename, "r");
$contents = fread ($fd,filesize ($filename));

fclose ($fd);
$delimiter = "\r\n";
$rows = explode($delimiter, $contents);
$counter = 0;
?>
<br>
<font color="blue" face="arial" size="4">File Rows (first 20)</font>
<hr>
<?php
foreach ( $rows as $row )
{
$counter++;
echo "<b>Row $counter: </b> $row<br>";

// build list columns
list($login_name, $pwd, $account_type, $access_level, $status, $first_name, $last_name, $agent_code) = explode("\t", $row);

// build assoc array for json
$user = array("domain"=>$board, "username"=>$login_name, "user_id"=>$agent_code, "password"=>$pwd, "first_name"=>$first_name, "last_name"=>$last_name);

// Publish a message to the exchange with a routing key
$ex->publish(json_encode($user), $mq['routing_key'], AMQP_NOPARAM, array("content_type"=>"application/data"));

if($counter == 20) {
break;
}
}

$ch->close();
$conn->close();
?>

最佳答案

您是否尝试过此处找到的示例:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php

您要查看的两个是:emit_log.php接收日志.php

使用的库与PECL/内置库不同,我相信内置的不支持消费。

关于macos - PHP 和 AMQP RabbitMQ 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11549788/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com