gpt4 book ai didi

javascript - RxJS 减少一个 ReplaySubject

转载 作者:行者123 更新时间:2023-12-04 18:34:41 27 4
gpt4 key购买 nike

我正在使用 ReactiveX/RxJS 版本。

假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个包含 id 和带有值的数组的对象。我想减少这个值数组并得到它们的总和。

问题是 ReplaySubject 是一个热的可观察对象并且它永远不会完成,至少我不希望它完成,因为我想要每 2 秒该对象值的总和。但是为了使用 reduce 运算符,应该完成 observable。那么,我应该如何进行呢?

E.G 不工作代码:

var subject = new Rx.ReplaySubject();

subject.
map(x => x.transactions).
// Reduce never concludes because ReplaySubject instance is not completed
reduce((v1, v2) => v1+v2, 0).
subscribe(function (value) {
console.log(value)
});

setInterval(injectData, 2000);

function injectData () {
subject.next({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}

最佳答案

考虑使用 Observable.prototype.scan() (RxJS documentation)
scan()reduce() 不同,基本上聚合一个 observable 并发出每个连续值仅在完成时发出结果。 (参见 scanreduce 的 Rx 解释)

使用 OP 代码的示例(这里是 fiddle ):

var subject = new Rx.ReplaySubject();

subject
// note: use "selectMany" to flatten observable of observables
.selectMany(x => Rx.Observable.fromArray(x.transactions))
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});

setInterval(injectData, 2000);

function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}

另一个例子:

由于 selectMany(),上面的代码为每个事务发出聚合。 .如果您只希望它每 2 秒发射一次,这是使用 reduce() 的好时机像这样(这是另一个 fiddle ):
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});

附加说明:

Rx Subjects 可以完成;您只需调用 onCompleted()当你准备好时。如果你完成了你的主题,你仍然可以使用 reduce() .比较这个 fiddle与上面的那个。

关于javascript - RxJS 减少一个 ReplaySubject,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36075342/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com