gpt4 book ai didi

Rxjs - refCount 变为 0 后出现意外的 publishReplay + refCount 行为

转载 作者:行者123 更新时间:2023-12-03 17:31:09 24 4
gpt4 key购买 nike

我的用例是这样的:我使用 websocket 连接到服务,并从服务中获取定期(但不可预测)的健康数据。该应用程序可能有此数据流的多个用户,因此我想共享它。新订阅者应该看到最近发布的健康数据。当没有更多订阅者时,我还想关闭 websocket。
我的应用程序使用 shareReplay(1)很长一段时间,直到发现它泄漏了底层连接( https://blog.strongbrew.io/share-replay-issue/ )。此时我们改为pipe(publishReplay(1), refCount) .原来这也有一个我没想到的微妙之处:

  • 订阅者 A 连接并建立 websocket 连接。
  • 订阅者 B 正确连接和共享,并获取最新数据。
  • A 和 B 都断开连接。 websocket 被拆除
  • 订阅者 C 连接,但只需要一个值 take(1) . publishReplay(1) 缓存的值被退回。

  • 在第 4 步中,我真的希望重新创建 websocket。缓存的值没有用。 publishReplay的时间窗口参数很诱人,但也不是我想要的。
    我设法找到了解决方案,通过使用 pipe(multicast(() => new ReplaySubject(1)), refCount()) ,但我对 Rx 的了解不够充分,无法理解其全部含义。
    我的问题是 - 实现我想要的行为的最佳方法是什么?
    谢谢!
    代码示例见 https://repl.it/@bradb/MinorColdRouter
    内联代码
        const { Observable, ReplaySubject } = require('rxjs');
    const { tap, multicast, take, publishReplay, refCount } = require('rxjs/operators');

    const log = console.log;

    function eq(a, b) {
    let result = JSON.stringify(a) == JSON.stringify(b);
    if (!result) {
    log('eq failed', a, b);
    }
    return result;
    }

    function assert(cond, msg) {
    if (!cond) {
    log('****************************************');
    log('Assert failed: ', msg);
    log('****************************************');
    }
    }

    function delay(t) {
    return new Promise(resolve => {
    setTimeout(resolve, t);
    });
    }

    let liveCount = 0;

    // emitValue 1 happens at 100ms, 2 at 200ms etc
    function testSource() {
    return Observable.create(function(observer) {
    let emitValue = 1;
    liveCount++;
    log('create');
    let interv = setInterval(() => {
    log('next --------> ', emitValue);
    observer.next(emitValue);
    emitValue++;
    }, 100);

    return () => {
    liveCount--;
    log('destroy');
    clearInterval(interv);
    };
    });
    }

    async function doTest(name, o) {
    log('\nDOTEST: ', name);
    assert(liveCount === 0, 'Start off not live');
    let a_vals = [];
    o.pipe(take(4)).subscribe(x => {
    a_vals.push(x);
    });
    await delay(250);
    assert(liveCount === 1, 'Must be alive');

    let b_vals = [];
    o.pipe(take(2)).subscribe(x => {
    b_vals.push(x);
    });
    assert(liveCount === 1, 'Two subscribers, one source');
    await delay(500);
    assert(liveCount === 0, 'source is destroyed');
    assert(eq(a_vals, [1, 2, 3, 4]), 'a vals match');
    assert(eq(b_vals, [2, 3]), 'b vals match');

    let c_vals = [];
    o.pipe(take(2)).subscribe(x => {
    c_vals.push(x);
    });
    assert(liveCount === 1, 'Must be alive');

    await delay(300);
    assert(liveCount === 0, 'Destroyed');
    assert(eq(c_vals, [1, 2]), 'c_vals match');
    }

    async function main() {
    await doTest(
    'bad: cached value is stale',
    testSource().pipe(
    publishReplay(1),
    refCount()
    )
    );
    await doTest(
    'good: But why is this different to publish replay?',
    testSource().pipe(
    multicast(() => new ReplaySubject(1)),
    refCount()
    )
    );
    await doTest(
    'bad: But why is this different to the above?',
    testSource().pipe(
    multicast(new ReplaySubject(1)),
    refCount()
    )
    );
    }
    main();

    最佳答案

    改写来自 carant 的评论:publishReplay将使用 单个 ReplaySubject 在引擎盖下,这是取消订阅,然后由 refCount 重新订阅.因此它的缓存值被重放。当您使用 multicast使用工厂,每次都会创建一个新的 ReplaySubject refCount取消 - 然后重新订阅 - 因此,没有缓存值。
    这是 carant 的链接,因为无法访问评论中的链接:

  • https://ncjamieson.com/understanding-publish-and-share/
  • https://ncjamieson.com/how-to-use-refcount/

  • 从文章:

    The multicasting infrastructure’s subject is able to be re-subscribed.

    publishReplay用途 multicast在引擎盖下,不提供工厂,但重新使用相同的 ReplaySubject。

    关于Rxjs - refCount 变为 0 后出现意外的 publishReplay + refCount 行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53798142/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com