gpt4 book ai didi

rxjs - shareReplayLatestWhileConnected with RxJS

转载 作者:行者123 更新时间:2023-12-04 17:30:33 26 4
gpt4 key购买 nike

我正在像这样创建我的可观察源(每 5 秒调用一次 api):
const obs$ = Observable.interval(5000).switchMap(() => makeApiCall());
我想修改$obs使其具有以下特点:

  • 仅当至少有 1 个订阅者
  • 时才启动 observable
  • 多播。 IE。如果我 obs$.subscribe(...)两次,底层代码makeApiCall()应该只运行一次。
  • 任何订阅者在任何时候都应该立即获得最后一个发出的值(而不是等待〜5s直到下一个值发出)
  • 可重试。如果有一个 makeApiCall()错误,我希望(如果可能)所有订阅者都收到错误通知,但重新连接到 $obs , 并继续做 makeApiCall()每5秒

  • 到目前为止,我发现了以下线索:

    看来我需要创建一个 BehaviorSubject myBehaviorSubject , 做一次订阅 obs$.subscribe(myBehaviorSubject) , 任何其他观察者都应该订阅 myBehaviorSubject .不确定这是否回答了“可重试”部分。

    我也看了shareReplay,好像是 $obs.shareReplay(1)会做的伎俩(4个要求)。如果我理解正确,它会将 ReplaySubject(1) 订阅到源 observable,并且 future 的观察者订阅此 ReplaySubject。是否有等效的 shareBehavior?

    在 RxSwift 中,我发现了 shareReplayLatestWhileConnected,这似乎是我想象中的 shareBehavior。但它在 RxJS 中不存在。

    任何想法什么是实现这一目标的最佳方法?

    最佳答案

    正如你提到的,shareReplay(1)几乎可以让你到达那里。它将向当前订阅者多播响应并将最后一个值(如果有)重播给新订阅者。这似乎是您想要的,而不是 shareBehavior (如果存在)因为您正在调用 api 并且没有初始值。

    你应该知道shareReplay将创建对源流的订阅,但仅在 refCount === 0 时取消订阅并且源流终止(错误或完成)。这意味着在第一次订阅之后,间隔将开始,即使没有更多订阅,它也会继续。

    如果您想在无人订阅时停止间隔,请使用 multicast(new ReplaySubject(1)).refCount() .多播运算符(operator)将创建对源流的单个订阅,并将所有值推送到作为实例 (multicast(new Subject())) 或由工厂 (multicast(() => new Subject())) 提供的主题中。多播后流的所有订阅者都将订阅多播主题。因此,当一个值流经多播运营商时,其所有订阅者都将获得该值。您可以更改传递给多播的主题类型以更改其行为。在您的情况下,您可能需要 ReplaySubject以便它将最后一个值重播给新订阅者。您可以使用 BehaviorSubject如果你觉得这满足你的需要。

    现在multicast运算符是 connectable这意味着您必须调用 connect()在流上使它变热。 refCount operator 基本上使可连接的 observable 行为类似于普通的 observable,因为它在订阅时会变热,但在没有订阅者时会变冷。这样做是为了保持一个内部引用计数(因此得名 refCount)。当refCount === 0它会断开连接。

    这与 shareReplay(1) 相同。有一个微小但重要的区别是,当没有更多订阅者时,它将取消订阅源流。如果您在订阅源时使用工厂方法创建新主题(例如:multicast(() => new ReplaySubject(1))),那么当流从热到冷再到热时,您将失去值(value),因为每次它都会创建一个新主题变热。如果您想在源订阅之间保持相同的主题,那么您可以传入主题而不是工厂(例如:multicast(new ReplaySubject(1)) 或使用其别名 publishReplay(1)

    至于您向订阅者提供错误然后重新订阅的最后要求,您不能调用 error订阅回调,然后继续获取 next 上的值打回来。如果未处理的错误到达订阅,它将终止订阅。因此,如果您希望您的订阅看到它并且仍然存在,您必须在它到达之前将其捕获并将其转换为普通消息。你可以这样做:catch((err) => of(err))并以某种方式标记它。如果你想静音然后返回 empty() .

    如果您想立即重试,可以使用 retryWhen运算符,但您可能希望将其放在共享运算符之前以使其通用。但是,这也可以防止您的订阅者知道错误。由于您的流的根是一个间隔,并且错误来自 switchMap 返回的内部可观察对象,该错误不会终止流的源,但可能会终止订阅。因此,只要您处理错误 (catch/catchError),就会在下一个间隔重试 api 调用。

    此外,您可能需要 timer(0, 5000)而不是间隔,以便您的 api 调用立即触发,然后以 5 秒的间隔触发。

    所以我建议如下:

    let count = 0;
    function makeApiCall() {
    return Rx.Observable.of(count++).delay(1000);
    }

    const obs$ = Rx.Observable.timer(0, 5000)
    .switchMap(() => makeApiCall().catch(() => Rx.Observable.empty()))
    .publishReplay(1)
    .refCount();

    console.log('1 subscribe');
    let firstSub = obs$.subscribe((x) => { console.log('1', x); });
    let secondSub;
    let thirdSub;

    setTimeout(() => {
    console.log('2 subscribe');
    secondSub = obs$.subscribe((x) => { console.log('2', x); });
    }, 7500);

    setTimeout(() => {
    console.log('1 unsubscribe');
    firstSub.unsubscribe();
    console.log('2 unsubscribe');
    secondSub.unsubscribe();
    }, 12000);

    setTimeout(() => {
    console.log('3 subscribe');
    thirdSub = obs$.subscribe((x) => { console.log('3', x); });
    }, 17000);

    setTimeout(() => {
    console.log('3 unsubscribe');
    thirdSub.unsubscribe();
    }, 30000);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>


    为方便起见,这里是多播的别名:
    publish() === multicast(new Subject())
    publishReplay(#) === multicast(new ReplaySubject(#))
    publishBehavior(value) === multicast(new BehaviorSubject(value))

    关于rxjs - shareReplayLatestWhileConnected with RxJS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49903028/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com