gpt4 book ai didi

javascript - RxJS:将历史数据与更新流相结合

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:56:01 29 4
gpt4 key购买 nike

场景:

我正在通过一个简单的 ajax 调用加载一个初始数据数组,并将该数据放入 Observable,我将其称为历史。同时,我连接到一个 websocket 并定期接收数据,我们称之为更新,我想将此数据附加到历史

具体来说,假设 ajax 调用发回数组 [0,1,2] 并且套接字(随时间)发出 34, 5 然后我想像这样累积这些值:

[0,1,2]        // historical
[0,1,2,3] // historical + updates1
[0,1,2,3,4] // historical + updates1 + updates2
[0,1,2,3,4,5] // etc

(请注意,这里有一个必须处理的并发边缘情况:historical 可能会产生 [0,1,2,3] 而第一个两个 updates34,在这种情况下我想要结束的仍然是 [0,1, 2,3,4]不是 [0,1,2,3,3,4]。)

最终目标是以单个 Observable stream 结束,它是 Observables historicalupdates 的组合,如所述。

到目前为止我尝试了什么:

仅累积 websocket 数据就足够容易了。我创建了 updates,这是 websocket 发出的 Observable 序列。每次观察到一个值时,我都可以使用 scan() 将其累积到数组中:

updates.scan((acc, update) => acc.concat([update]), [])

这会产生类似的结果

[3]
[3,4]
[3,4,5]

我的下一个问题是如何将它与历史 结合起来。由于 historical 的数据可能在已经观察到一个或多个 updates 之后到达,因此在我们等待 historical 时需要积累这些更新。我设法使用 withLatestFrom() 实现了这一点:

const stream = historical
.withLatestFrom(
updates.scan((acc, update) => acc.concat([update]), []),
(history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
)

观察 stream 产生一个值,[0,1,2,3,4,5],它是 historical 的组合以及在历史之前到达的任何更新。正是我想要的。

但是,我不知道从那里去哪里。我怎样才能继续将 updates 附加到 stream 以便随着时间的推移,stream 产生如下内容:

[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]

我没有看到为此使用 scan 的方法,就像我为 updates 所做的那样,因为在这种情况下我需要 scan 的初始(种子)值是一个 Observable,而不是一个数组。

有没有一种方法可以做到这一点——可以通过添加我目前已有的方法,还是可以使用更好的替代方法来完成整个事情?

最佳答案

如果我没理解错,我会使用 skipUntil()运算符(operator)继续收集更新而不进一步发出它们。然后对于 withLatestFrom() 运算符,我会选择 updates Observable 作为它的来源。这将等待 skipUntil(),直到历史数据可用,然后在 updates 的每次发射时发出。

let updates = Observable
.timer(0, 1000)
.scan((acc, update) => {
acc.push(update);
return acc;
}, []);

let historical = Observable.defer(() => {
console.log('Sending AJAX request ...');
return Observable.of(['h1', 'h2', 'h3']);
})
.delay(3000)
.share();


const stream = updates.skipUntil(historical)
.withLatestFrom(historical, (buffer, history) => {
return history.concat(buffer);
})
.map(val => val) // remove duplicates;


stream.subscribe(val => console.log(val));

控制台输出如下:

Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]

查看现场演示:https://jsbin.com/kumolez/11/edit?js,console

我不知道你的用例是什么,但我会尽量避免使用 concat() 因为当 buffer 增长时它可能会变慢。

此外,如果您在更新到达一个项目时发出更新(而不是累积它们),您可以使用 distinct() 运算符来过滤掉重复项。

顺便说一句,我假设您使用的是 RxJS 5。

关于javascript - RxJS:将历史数据与更新流相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42235610/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com