gpt4 book ai didi

javascript - 如何通过重新连接实现 shareReplay?

转载 作者:行者123 更新时间:2023-11-30 00:10:22 25 4
gpt4 key购买 nike

在下面的代码中,我创建了一个简单的可观察对象,它产生一个值然后完成。然后我分享那个可观察的重播最后一个项目并订阅 3 次。紧随其后的第一次,第二次在产生值(value)之前,第三次在产生值(value)并完成 observable 之后。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
}).shareReplay(1);

obs$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);

setTimeout( () => {
obs$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')

);
}, 1000);

setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')

);
}, 6000);

您可以 execute this on jsbin

这导致以下弹珠图

Actual
s1: -----1$
s2: \--1$
s3: \1$

但我希望

Expected
s1: -----1$
s2: \--1$
s3: \----2$

我能理解为什么有人想要第一个行为,但我的推理是,与我返回数字的这个例子不同,我可能会返回一个容易受到取消订阅行为影响的对象,例如数据库连接.如果上面的弹珠图表示一个数据库连接,在我调用 db.close() 的 dispose 方法中,在第三次订阅时我会有一个异常,因为我接收到一个数据库作为值被释放的处理程序。 (因为当第二个订阅完成时 refCount = 0 并且源被处置)。

这个例子还有另一个奇怪的事情,就是即使它是用第一个值并紧随其后完成,它订阅源两次(正如您可以通过重复的“创建可观察对象”看到的那样)

我知道this github issue谈论这个但我想念的是:

如何实现(在 RxJs4 和 5 中)一个共享的 observable,它可以在源 observable 尚未完成的情况下重放最后一个项目,如果它已完成(refCount = 0),则重新创建 observable。

在 RxJs5 中,我认为共享方法解决了我问题的重新连接部分,但没有解决共享部分。

在 RxJs4 中我一无所知

如果可能,我想使用现有的运算符或主题来解决这个问题。我的直觉告诉我,我必须用这样的逻辑创建一个不同的主题,但我还没有完全做到这一点。

最佳答案

关于 shareReplay 的一点:

shareReplay保持相同的基础 ReplaySubject返回的可观察对象的剩余生命周期的实例。

一次ReplaySubject完成后,您不能再向其中添加任何值,但它仍会重播。所以……

  1. 您第一次订阅可观察对象,超时开始。这会增加 i来自 01 .
  2. 您第二次订阅了 observable 并且超时已经过去了。
  3. 超时回调触发并发送onNext(i) , 然后 onCompleted() .
  4. onCompleted()信号完成 ReplaySubjectshareReplay里面,这意味着从现在开始,共享的可观察对象将简单地重放它拥有的值(即 1)并完成。

关于一般共享可观察对象的一点:

另一个独立的问题是,由于您共享了可观察对象,它只会调用订阅者函数一次。这意味着 i只会增加一次。所以即使你没有 onCompleted并杀死你的潜在ReplaySubject ,你最终不会将它增加到 2 .

这不是 RxJS 5

一个快速判断的方法是 onNext对比next .您当前在示例中使用的是 RxJS 4,但是您已经使用 RxJS 5 标记了它,并且您在 RxJS 5 中发现了一个问题。RxJS 5 是测试版,是一个完全重写 RxJS 4 的新版本。 API 更改主要是为了匹配 es-observable proposal which is currently at stage 1

更新示例

I've updated your example to give you your expected results

基本上,您希望为前两次调用使用共享版本的可观察对象,为第三次调用使用原始可观察对象。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
})


let shared$ = obs$.shareReplay(1);

shared$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);

setTimeout( () => {
shared$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')

);
}, 1000);

setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')

);
}, 6000);

无关

此外,protip:请务必为调用 clearTimeout 的自定义可观察对象返回取消语义。 .

关于javascript - 如何通过重新连接实现 shareReplay?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36753392/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com