gpt4 book ai didi

php - Ratchet PHP WAMP - React/ZeroMQ - 特定用户广播

转载 作者:IT王子 更新时间:2023-10-29 00:07:44 28 4
gpt4 key购买 nike

Note: This is not the same as this question which utilises MessageComponentInterface. I am using WampServerInterface instead, so this question pertains to that part specifically. I need an answer with code examples and an explanation, as I can see this being helpful to others in the future.


我正在使用 Ratchet 和 ZeroMQ 的 WAMP 部分,我目前有 push integration tutorial 的工作版本.


  • zeromq 服务器已启动并运行,准备记录订阅者和取消订阅者
  • 用户通过 websocket 协议(protocol)在浏览器中连接
  • 一个循环被启动,它将数据发送给请求它的特定用户
  • 当用户断开连接时,该用户数据的循环将停止

我的第 (1) 和 (2) 点有效,但我遇到的问题是第三点:

首先:我怎样才能只向每个特定用户发送数据? 广播将其发送给每个人,除非“主题”最终可能是个人用户 ID?

其次:我有一个很大的安全问题。如果我要从客户端发送想要订阅的用户 ID,这似乎是我需要的,那么用户可以更改变量到另一个用户的 ID 并返回他们的数据。

第三点:我必须运行一个单独的 php 脚本,其中包含 zeromq 的代码以开始实际循环。我不确定这是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的 php 文件。这是我需要整理的一个主要领域。



我按字面意思输入 php bin/push-server.php 来运行它。订阅和取消订阅将输出到此终端以进行调试。

$loop   = React\EventLoop\Factory::create();
$pusher = Pusher;

$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->on('message', array($pusher, 'onMessage'));

$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, ''); // Binding to means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(


通过 websocket 发送数据的 Pusher

我省略了无用的东西,专注于 onMessage()onSubscribe() 方法。

public function onSubscribe(ConnectionInterface $conn, $topic) 
$subject = $topic->getId();
$ip = $conn->remoteAddress;

if (!array_key_exists($subject, $this->subscribedTopics))
$this->subscribedTopics[$subject] = $topic;

$this->clients[] = $conn->resourceId;

echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);

public function onMessage($entry) {
$entryData = json_decode($entry, true);


if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {

$topic = $this->subscribedTopics[$entryData['topic']];

// This sends out everything to multiple users, not what I want!!
// I can't send() to individual connections from here I don't think :S

开始循环使用上述 Pusher 代码的脚本

这是我的问题 - 这是一个单独的 php 文件,希望将来可以集成到其他代码中,但目前我不确定如何正确使用它。我是否从 session 中获取用户 ID?我仍然需要从客户端发送它...

// Thought sessions might work here but they don't work for subscription
$userId = $_SESSION['userId'];

$loop = React\EventLoop\Factory::create();

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');

$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {

$entryData = array(
'topic' => 'subscriptionTopicHere',
'userId' => $userId

// So it doesn't go on infinitely if run from browser
if ($i >= 3)

// Send stuff to the queue


$(document).ready(function() { 

var conn = new ab.Session(
, function() {
conn.subscribe('topicHere', function(topic, data) {
, function() {
console.warn('WebSocket connection closed');
, {
'skipSubprotocolCheck': true



  • 如何向个人用户发送个人消息?当他们访问在 JS 中启动 websocket 连接的页面时,我是否也应该启动将内容插入 PHP 队列的脚本(zeromq)?这就是我目前正在手动执行的操作,只是感觉不对

  • 当从 JS 订阅用户时,从 session 中获取用户 ID 并从客户端发送是不安全的。这可能是伪造的。请告诉我有一个更简单的方法,如果是,如何?


Note: My answer here does not include references to ZeroMQ, as I am not using it any more. However, I'm sure you will be able to figure out how to use ZeroMQ with this answer if you need to.


首先,Websocket RFCWAMP Spec声明要订阅的主题必须是一个字符串。我在这里有点作弊,但我仍然遵守规范:我正在传递 JSON。

"topic": "subject here",
"userId": "1",
"token": "dsah9273bui3f92h3r83f82h3"

JSON 仍然是一个字符串,但它允许我传递更多数据来代替“主题”,并且 PHP 在另一端执行 json_decode() 很简单。当然,您应该验证您确实收到了 JSON,但这取决于您的实现。


  • 主题


  • 用户 ID


  • token

这应该是一个一次使用随机生成的 token ,在您的 PHP 中生成,并传递给 JavaScript 变量。当我说“一次使用”时,我的意思是每次你重新加载页面(并且,通过扩展,在每个 HTTP 请求上),你的 JavaScript 变量应该在那里有一个新的标记。此 token 应根据用户 ID 存储在数据库中。

然后,一旦发出 websocket 请求,您将 token 和用户 ID 与数据库中的那些相匹配,以确保用户确实是他们所说的人,并且他们没有乱用 JS 变量。

Note: In your event handler, you can use $conn->remoteAddress to get the IP of the connection, so if someone is trying to connect maliciously, you can block them (log them or something).


之所以有效,是因为每次新连接通过时,唯一 token 可确保任何用户都无法访问其他任何人的订阅数据。


这是我用于运行循环和事件处理程序的内容。我正在创建循环,完成所有装饰器样式对象的创建,并传入我的 EventHandler(我很快就会用到)以及其中的循环。

$loop = Factory::create();

new IoServer(
new WsServer(
new WampServer(
new EventHandler($loop) // This is my class. Pass in the loop!



class EventHandler implements WampServerInterface, MessageComponentInterface
* @var \React\EventLoop\LoopInterface
private $loop;

* @var array List of connected clients
private $clients;

* Pass in the react event loop here
public function __construct(LoopInterface $loop)
$this->loop = $loop;

* A user connects, we store the connection by the unique resource id
public function onOpen(ConnectionInterface $conn)
$this->clients[$conn->resourceId]['conn'] = $conn;

* A user subscribes. The JSON is in $subscription->getId()
public function onSubscribe(ConnectionInterface $conn, $subscription)
// This is the JSON passed in from your JavaScript
// Obviously you need to validate it's JSON and expected data etc...
$data = json_decode(subscription->getId());

// Validate the users id and token together against the db values

// Now, let's subscribe this user only
// 5 = the interval, in seconds
$timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) {
$data = "whatever data you want to broadcast";
return $subscription->broadcast(json_encode($data));

// Store the timer against that user's connection resource Id
$this->clients[$conn->resourceId]['timer'] = $timer;

public function onClose(ConnectionInterface $conn)
// There might be a connection without a timer
// So make sure there is one before trying to cancel it!
if (isset($this->clients[$conn->resourceId]['timer']))
if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface)


/** Implement all the extra methods the interfaces say that you must use **/


  • 唯一 token 、用户 ID 和连接 ID 提供了确保一个用户无法看到另一个用户的数据所需的唯一组合。
  • 唯一 token 意味着如果同一用户打开另一个页面并请求订阅,他们将拥有自己的连接 ID + token 组合,因此同一用户不会在同一页面上获得双倍的订阅(基本上,每个连接有自己的个人数据)。


在对数据进行任何操作之前,您应该确保所有数据都经过验证而不是黑客尝试。使用类似 Monolog 的方式记录所有连接尝试,并在发生任何严重问题时设置电子邮件转发(例如服务器停止工作,因为有人是 SCSS 并试图入侵您的服务器)。


  • 验证一切。我怎么强调都不过分。您在每次请求时都会更改的唯一 token 很重要
  • 请记住,如果您在每次 HTTP 请求时重新生成 token ,并且在尝试通过 websockets 连接之前发出 POST 请求,则必须在尝试连接之前将重新生成的 token 传回您的 JavaScript (否则你的 token 将无效)。
  • 记录一切。记录每个连接、询问什么主题和断开连接的人。 Monolog 非常适合这个。

关于php - Ratchet PHP WAMP - React/ZeroMQ - 特定用户广播,我们在Stack Overflow上找到一个类似的问题:

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号