gpt4 book ai didi

node.js - 将网络套接字变成可观察的

转载 作者:太空宇宙 更新时间:2023-11-04 02:09:50 29 4
gpt4 key购买 nike

我有一个 net tcp 套接字 node.js 代码部分,我想将其从使用 callback 转换为 rx

我在 feed.js 模块中看起来像这样:

var net = require('net');
var server = net.createServer(function(socket) {
...

// Handle incoming messages from clients.
socket.on('data', function (data) {
broadcast(data, socket);
});

...
}

function broadcast(message, sender)
{
...
onChangeHandler(stock.symbol, 'stock', stock);
}
}

function start(onChange) {
onChangeHandler = onChange;
}

exports.start = start;
server.listen(....);

然后上述调用的客户端注册一个回调:

feed.start(function(room, type, message) {
//...Do something with the message
});

我想将其转换为使用 Rx Observable/Observer。我发现有一种方法可以从 web 套接字 生成可观察流(尽管它使用我不需要的双向 Subject):

fromWebSocket(address, protocol) {
var ws = new WebSocket(address, protocol);

// Handle the data
var osbervable = Rx.Observable.create (function (obs) {
// Handle messages
ws.onmessage = obs.onNext.bind(obs);
ws.onerror = obs.onError.bind(obs);
ws.onclose = obs.onCompleted.bind(obs);

// Return way to unsubscribe
return ws.close.bind(ws);
});

var observer = Rx.Observer.create(function (data) {
if (ws.readyState === WebSocket.OPEN) { ws.send(data); }
});

return Rx.Subject.create(observer, observable);
}

var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');

// Receive data
socketSubject.subscribe(
function (data) {
// Do something with the data
},
function (error) {
// Do something with the error
},
function () {
// Do something on completion
});

// Send data
socketSubject.onNext(42);

net 套接字的等效项是什么?如果有一个标准库可以使用那就可以了。

我最初的尝试是这样的,但我不知道如何将 Rx 和套接字函数一起绑定(bind)到 onnext 中:

var net = require('net');

fromNetSocket(address, protocol) {
var ns = net.createServer(function(socket) {


socket.on('disconnect', function () { // This seems like it maps to onclose

console.log('User disconnected. %s. Socket id %s', socket.id);
});

// Handle incoming messages from clients.
socket.on('data', function (data) { //this should map to onnext

});

// Handle the data
var osbervable = Rx.Observable.create (function (obs) {
// Handle messages
ns.onmessage = obs.onNext.bind(obs);
ns.onerror = obs.onError.bind(obs);
ns.onclose = obs.onCompleted.bind(obs);

// Return way to unsubscribe
return ns.close.bind(ns);
});
});
};

最佳答案

尝试以下操作

const createSubject = () => {

return Rx.Observable.create((observer) => {

const socket = net.connect({port: 1705}, () => {

log.i('Connected to Server!');

let socketObservable = Rx.Observable.create((observer) => {
socket.on('data', (data) => observer.next(JSON.parse(data)));
socket.on('error', (err) => observer.error(err));
socket.on('close', () => observer.complete());
});

let socketObserver = {
next: (data) => {
if (!socket.destroyed) {
socket.write(`${JSON.stringify(data)}\r\n`);
}
}
};

const subject = Rx.Subject.create(socketObserver, socketObservable);
observer.next(subject);
observer.complete();

});

});

};

然后你可以像这样使用主题

createSubject().subscribe((con) => {

con.subscribe((data) => console.log(data));

con.next({
id: utils.UUID(),
jsonrpc: '2.0',
method: 'Server.GetRPCVersion'
});

});

关于node.js - 将网络套接字变成可观察的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42751073/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com