gpt4 book ai didi

javascript - RXJS Stream - 如何等待我的流结束并返回数据,然后执行我的操作

转载 作者:行者123 更新时间:2023-12-02 21:09:53 25 4
gpt4 key购买 nike

我有一个正在订阅的 Websocket 端点。我想获取该数据,然后对其进行操作。

代码:

// Simple HTTP POST Request. Works Perfectly. I am Logged In to the API
const authenticationRequest = () => axios.post(authenticationUrl, {
user: username, password
})
.then((response) => response.data)
.catch((error) => console.error(console.error('Error Response', error)));

// WS Request. I need to wait for this to return my Data and then operate on them
const wsRequest = async () => {
// Getting the Auth Token. Working Perfectly.
const reqToken = await authenticationRequest();

// Hitting the ws Endplint. Working Perfectly.
const webSocketRequest = new WebSocket(topicDataUrl);

// Setting the Data for the First Message. Works Perfectly.
const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};

// Initialising an Empty Array. Works.
let websocketData = [];

// Opening the Endpoint
webSocketRequest.onopen = () => {

// Sending the first Message
webSocketRequest.send(JSON.stringify(firstMessage));

// On Each Message
webSocketRequest.onmessage = (streamEvent) => {
of(streamEvent).pipe(
map(event => JSON.parse(event.data)), // Parse the Data
filter(message => message.type === 'RECORD') // Filter the Data
).subscribe(
message => websocketData.push(message.data.value)// Adding each Value from each message to the Array.
);
};
};
console.log(JSON.stringify(websocketData), 'Websocket DATA'); // Empty Array
return websocketData;
};

这里我调用了几行,但仍然没有结果。我得到一个空数组。

(async function () {
const data = await wsRequest();
console.log(JSON.stringify(data), 'Data'); // Still Empty
}());

那么,我做错了什么?有人可以向我解释一下这个问题吗?我的意思是我得到了事物的异步性,但我在等待。我什至尝试设置超时但没有成功。

我的直播正确吗?也许那里有问题??

因此,RXJS 操作是异步的。所以,我需要两件事。- 操作完成后关闭流。 (尝试了 takeUntiltakeWhile,但显然做错了什么)- 等待返回实际数据(WebsocketData)。

更新:

async function authenticationRequest() {
const AuthenticateWith = await axios.post(authenticationUrl, {
user: username,
password
})
.then(response => response.data)
.catch((error) => console.error('Error:', error));

return AuthenticateWith;
}
const webSocketRequest = new WebSocket(topicDataUrl);
const websocketData = new Array;
const subject = new Subject();

async function requestToWSEndpoint() {
const reqToken = await authenticationRequest();

const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};

webSocketRequest.onopen = () => {
webSocketRequest.send(JSON.stringify(firstMessage));

webSocketRequest.onmessage = (streamEvent) => {
JSON.parse(streamEvent.data).type === 'RECORD' && websocketData.push(JSON.parse(streamEvent.data).data.value);
subject.next(websocketData);
JSON.parse(streamEvent.data).type === 'END' && subject.complete();
};
};
};

(async function () {
requestToWSEndpoint();

const chartData = subject.subscribe((event) => console.log(event, 'Event')); // Event Adds to the Array and at the End I have all my Items(Filtered). It printed 100 Times.
console.log('ARRAY', chartData); // This returns [Subscriber {closed: false, _parentOrParents: null, _subscriptions: Array(1), syncErrorValue: null, syncErrorThrown: false, …}]. This is what I want. The Array.
}());

最佳答案

我的建议如我的评论中所述:

const subject = new Subject();
...
}
(async function () {
wsRequest();
subject.pipe(finalize(()=> {console.log('ARRAY', websocketData);})).subscribe();
}());

实际上,您甚至不需要在 wsRequest 中执行操作的函数,除了 const reqToken = waitauthenticationRequest();。其余的可以轻松地在全局范围内进行。

您应该查看rxjs 运算符的文档。

关于javascript - RXJS Stream - 如何等待我的流结束并返回数据,然后执行我的操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61128017/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com