gpt4 book ai didi

php - Ratchet PHP WAMP - React/ZeroMQ - 特定用户广播

转载 作者:IT王子 更新时间:2023-10-29 00:07:44 28 4
gpt4 key购买 nike

Note: This is not the same as this question which utilises MessageComponentInterface. I am using WampServerInterface instead, so this question pertains to that part specifically. I need an answer with code examples and an explanation, as I can see this being helpful to others in the future.

为单个用户尝试循环推送

我正在使用 Ratchet 和 ZeroMQ 的 WAMP 部分,我目前有 push integration tutorial 的工作版本.

我正在尝试执行以下操作:

  • zeromq 服务器已启动并运行,准备记录订阅者和取消订阅者
  • 用户通过 websocket 协议(protocol)在浏览器中连接
  • 一个循环被启动,它将数据发送给请求它的特定用户
  • 当用户断开连接时,该用户数据的循环将停止

我的第 (1) 和 (2) 点有效,但我遇到的问题是第三点:

首先:我怎样才能只向每个特定用户发送数据? 广播将其发送给每个人,除非“主题”最终可能是个人用户 ID?

其次:我有一个很大的安全问题。如果我要从客户端发送想要订阅的用户 ID,这似乎是我需要的,那么用户可以更改变量到另一个用户的 ID 并返回他们的数据。

第三点:我必须运行一个单独的 php 脚本,其中包含 zeromq 的代码以开始实际循环。我不确定这是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的 php 文件。这是我需要整理的一个主要领域。

以下代码显示了我目前拥有的内容。

刚刚从控制台运行的服务器

我按字面意思输入 php bin/push-server.php 来运行它。订阅和取消订阅将输出到此终端以进行调试。

$loop   = React\EventLoop\Factory::create();
$pusher = Pusher;

$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));

$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
),
$webSock
);

$loop->run();

通过 websocket 发送数据的 Pusher

我省略了无用的东西,专注于 onMessage()onSubscribe() 方法。

public function onSubscribe(ConnectionInterface $conn, $topic) 
{
$subject = $topic->getId();
$ip = $conn->remoteAddress;

if (!array_key_exists($subject, $this->subscribedTopics))
{
$this->subscribedTopics[$subject] = $topic;
}

$this->clients[] = $conn->resourceId;

echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}

public function onMessage($entry) {
$entryData = json_decode($entry, true);

var_dump($entryData);

if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
return;
}

$topic = $this->subscribedTopics[$entryData['topic']];

// This sends out everything to multiple users, not what I want!!
// I can't send() to individual connections from here I don't think :S
$topic->broadcast($entryData);
}

开始循环使用上述 Pusher 代码的脚本

这是我的问题 - 这是一个单独的 php 文件,希望将来可以集成到其他代码中,但目前我不确定如何正确使用它。我是否从 session 中获取用户 ID?我仍然需要从客户端发送它...

// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];

$loop = React\EventLoop\Factory::create();

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {

$entryData = array(
'topic' => 'subscriptionTopicHere',
'userId' => $userId
);
$i++;

// So it doesn't go on infinitely if run from browser
if ($i >= 3)
{
$loop->stop();
}

// Send stuff to the queue
$socket->send(json_encode($entryData));
});

最后是要订阅的客户端js

$(document).ready(function() { 

var conn = new ab.Session(
'ws://localhost:8080'
, function() {
conn.subscribe('topicHere', function(topic, data) {
console.log(topic);
console.log(data);
});
}
, function() {
console.warn('WebSocket connection closed');
}
, {
'skipSubprotocolCheck': true
}
);
});

结论

以上是可行的,但我真的需要弄清楚以下几点:

  • 如何向个人用户发送个人消息?当他们访问在 JS 中启动 websocket 连接的页面时,我是否也应该启动将内容插入 PHP 队列的脚本(zeromq)?这就是我目前正在手动执行的操作,只是感觉不对

  • 当从 JS 订阅用户时,从 session 中获取用户 ID 并从客户端发送是不安全的。这可能是伪造的。请告诉我有一个更简单的方法,如果是,如何?

最佳答案

Note: My answer here does not include references to ZeroMQ, as I am not using it any more. However, I'm sure you will be able to figure out how to use ZeroMQ with this answer if you need to.

使用JSON

首先,Websocket RFCWAMP Spec声明要订阅的主题必须是一个字符串。我在这里有点作弊,但我仍然遵守规范:我正在传递 JSON。

{
"topic": "subject here",
"userId": "1",
"token": "dsah9273bui3f92h3r83f82h3"
}

JSON 仍然是一个字符串,但它允许我传递更多数据来代替“主题”,并且 PHP 在另一端执行 json_decode() 很简单。当然,您应该验证您确实收到了 JSON,但这取决于您的实现。

那么我在这里经过了什么,为什么?

  • 主题

主题是用户订阅的主题。您可以使用它来决定将哪些数据传回给用户。

  • 用户 ID

显然是用户的ID。您必须使用下一部分验证此用户是否存在并且是否允许订阅:

  • token

这应该是一个一次使用随机生成的 token ,在您的 PHP 中生成,并传递给 JavaScript 变量。当我说“一次使用”时,我的意思是每次你重新加载页面(并且,通过扩展,在每个 HTTP 请求上),你的 JavaScript 变量应该在那里有一个新的标记。此 token 应根据用户 ID 存储在数据库中。

然后,一旦发出 websocket 请求,您将 token 和用户 ID 与数据库中的那些相匹配,以确保用户确实是他们所说的人,并且他们没有乱用 JS 变量。

Note: In your event handler, you can use $conn->remoteAddress to get the IP of the connection, so if someone is trying to connect maliciously, you can block them (log them or something).

为什么会这样?

之所以有效,是因为每次新连接通过时,唯一 token 可确保任何用户都无法访问其他任何人的订阅数据。

服务器

这是我用于运行循环和事件处理程序的内容。我正在创建循环,完成所有装饰器样式对象的创建,并传入我的 EventHandler(我很快就会用到)以及其中的循环。

$loop = Factory::create();

new IoServer(
new WsServer(
new WampServer(
new EventHandler($loop) // This is my class. Pass in the loop!
)
),
$webSock
);

$loop->run();

事件处理器

class EventHandler implements WampServerInterface, MessageComponentInterface
{
/**
* @var \React\EventLoop\LoopInterface
*/
private $loop;

/**
* @var array List of connected clients
*/
private $clients;

/**
* Pass in the react event loop here
*/
public function __construct(LoopInterface $loop)
{
$this->loop = $loop;
}

/**
* A user connects, we store the connection by the unique resource id
*/
public function onOpen(ConnectionInterface $conn)
{
$this->clients[$conn->resourceId]['conn'] = $conn;
}

/**
* A user subscribes. The JSON is in $subscription->getId()
*/
public function onSubscribe(ConnectionInterface $conn, $subscription)
{
// This is the JSON passed in from your JavaScript
// Obviously you need to validate it's JSON and expected data etc...
$data = json_decode(subscription->getId());

// Validate the users id and token together against the db values

// Now, let's subscribe this user only
// 5 = the interval, in seconds
$timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) {
$data = "whatever data you want to broadcast";
return $subscription->broadcast(json_encode($data));
});

// Store the timer against that user's connection resource Id
$this->clients[$conn->resourceId]['timer'] = $timer;
}

public function onClose(ConnectionInterface $conn)
{
// There might be a connection without a timer
// So make sure there is one before trying to cancel it!
if (isset($this->clients[$conn->resourceId]['timer']))
{
if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface)
{
$this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']);
}
}

unset($this->clients[$conn->resourceId]);
}

/** Implement all the extra methods the interfaces say that you must use **/
}

基本上就是这样。这里的要点是:

  • 唯一 token 、用户 ID 和连接 ID 提供了确保一个用户无法看到另一个用户的数据所需的唯一组合。
  • 唯一 token 意味着如果同一用户打开另一个页面并请求订阅,他们将拥有自己的连接 ID + token 组合,因此同一用户不会在同一页面上获得双倍的订阅(基本上,每个连接有自己的个人数据)。

扩展

在对数据进行任何操作之前,您应该确保所有数据都经过验证而不是黑客尝试。使用类似 Monolog 的方式记录所有连接尝试,并在发生任何严重问题时设置电子邮件转发(例如服务器停止工作,因为有人是 SCSS 并试图入侵您的服务器)。

结束点

  • 验证一切。我怎么强调都不过分。您在每次请求时都会更改的唯一 token 很重要
  • 请记住,如果您在每次 HTTP 请求时重新生成 token ,并且在尝试通过 websockets 连接之前发出 POST 请求,则必须在尝试连接之前将重新生成的 token 传回您的 JavaScript (否则你的 token 将无效)。
  • 记录一切。记录每个连接、询问什么主题和断开连接的人。 Monolog 非常适合这个。

关于php - Ratchet PHP WAMP - React/ZeroMQ - 特定用户广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19065799/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com