gpt4 book ai didi

rxjs 5 发布重播引用计数

转载 作者:行者123 更新时间:2023-12-03 13:47:43 26 4
gpt4 key购买 nike

我想不通 publishReplay().refCount()作品。

例如( https://jsfiddle.net/7o3a45L1/ ):

var source = Rx.Observable.create(observer =>  {
console.log("call");
// expensive http request
observer.next(5);
}).publishReplay().refCount();

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log("");

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log("");

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log("");

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();

给出以下结果:

call observerA: 5

observerB: 5 call observerB: 5

observerC: 5 observerC: 5 call observerC: 5

observerD: 5 observerD: 5 observerD: 5 call observerD: 5



1)为什么observerB、C、D被多次调用?

2)为什么“调用”打印在每一行而不是行首?

另外,如果我调用 publishReplay(1).refCount() ,它分别调用了观察者 B、C 和 D 2 次。

我期望的是每个新观察者都只收到一次值 5 并且“调用”只打印一次。

最佳答案

publishReplay(x).refCount()组合执行以下操作:

  • 它创建了一个 ReplaySubject最多可重播 x 次排放。如果 x 未定义,则它重放完整的流。
  • 它使这个 ReplaySubject使用 refCount() 运算符兼容多播。这导致并发订阅接收相同的发射。

  • 您的示例包含一些使它们如何协同工作的问题。请参阅以下修改后的片段:

    var state = 5
    var realSource = Rx.Observable.create(observer => {
    console.log("creating expensive HTTP-based emission");
    observer.next(state++);
    // observer.complete();

    return () => {
    console.log('unsubscribing from source')
    }
    });


    var source = Rx.Observable.of('')
    .do(() => console.log('stream subscribed'))
    .ignoreElements()
    .concat(realSource)
    .do(null, null, () => console.log('stream completed'))
    .publishReplay()
    .refCount()
    ;

    subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
    subscription1.unsubscribe();

    subscription2 = source.subscribe(v => console.log('observerB: ' + v));
    subscription2.unsubscribe();

    subscription3 = source.subscribe(v => console.log('observerC: ' + v));
    subscription3.unsubscribe();

    subscription4 = source.subscribe(v => console.log('observerD: ' + v));
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>


    运行这段代码时,我们可以清楚地看到它没有为观察者 D 发出重复的值,实际上它为每个订阅创建了新的排放。怎么来的?

    每次订阅都会在下一次订阅发生之前取消订阅。这有效地使 refCount 减少回零,没有进行多播。

    问题在于 realSource流未完成。因为我们没有多播,所以下一个订阅者会得到一个新的 realSource 实例。通过 ReplaySubject 并且新的排放量在先前已经排放的排放量之前。

    因此,要通过多次调用昂贵的 HTTP 请求来修复您的流,您必须完成该流,以便 publishReplay 知道它不需要重新订阅。

    关于rxjs 5 发布重播引用计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42189801/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com