gpt4 book ai didi

php - 使用 RabbitMQ 的 Swoole

转载 作者:可可西里 更新时间:2023-11-01 12:59:36 25 4
gpt4 key购买 nike

我正在尝试使用 websockets 将一些数据从 php 应用程序发送到用户的浏览器。因此我决定使用 Swoole结合 RabbitMQ。

这是我第一次使用 websockets,在阅读了一些关于 Socket.IO、Ratchet 等的帖子后,我决定停止使用 Swoole,因为它是用 C 语言编写的,并且可以方便地与 php 一起使用。

这就是我对使用 websockets 启用数据传输的想法的理解:1) CLI启动RabbitMQ worker和Swoole server2)php应用向RabbitMQ发送数据3) RabbitMQ 向 worker 发送带有数据的消息4) Worker 收到带有数据的消息+ 与Swoole socket 服务器建立socket 连接。5)Swoole服务器向所有连接广播数据

问题是如何绑定(bind)Swoole socket server和RabbitMQ?或者如何让RabbitMQ与Swoole建立连接并向其发送数据?

代码如下:

Swoole服务器(swoole_sever.php)

$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
echo "received message: {$frame->data}\n";
$server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
echo "connection close: {$fd}\n";
});

$server->start();

Worker 从 RabbitMQ 接收消息,然后连接到 Swoole 并通过套接字连接(worker.php)广播消息

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


// Here I'm trying to make connection to Swoole server and sernd data
$cli = new \swoole_http_client('0.0.0.0', 2345);

$cli->on('message', function ($_cli, $frame) {
var_dump($frame);
});

$cli->upgrade('/', function($cli)
{
$cli->push('This is the message to send to Swoole server');
$cli->close();
});
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

将消息发送到 RabbitMQ (new_task.php) 的新任务:

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

启动 swoole server 和 worker 后,我从命令行触发 new_task.php:

php new_task.php

在运行 RabbitMQ Worker 的命令行提示符 (worker.php) 中,我可以看到一条消息已传递给该 worker(“[x] Received Hello World!” 消息正在出现)。

但是在运行 Swoole 服务器的命令行提示符下什么也没有发生。

所以问题是:1)这种做法的思路对吗?2) 我做错了什么?

最佳答案

在收到消息时触发的回调(在 worker.php 中)中,您使用的是仅异步的 swoole_http_client。这似乎导致代码永远不会被完全执行,因为回调函数在触发异步代码之前返回。

做同样事情的同步方法将解决问题。这是一个简单的例子:

$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();

github 查看 WebSocketClient 类和示例用法.

您也可以将其包装在 coroutine 中,像这样:

go(function () {
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();
});

关于php - 使用 RabbitMQ 的 Swoole,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49226659/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com