gpt4 book ai didi

javascript - RxJs:当所有取消订阅时中止延迟和共享的可观察对象

转载 作者:行者123 更新时间:2023-12-04 13:28:13 25 4
gpt4 key购买 nike

我想创建一个 rxjs Observable运行长轮询操作。
每次迭代都会发出中间结果。
isComplete返回真,Observable完成。
此功能应如下所示

  • 它应该只在有 时才开始至少一位订阅者
  • 应该允许多个订阅者 分享结果
  • 如果存在 ,则应中止轮询并取消调用没有订阅者

  • 以下代码正常工作并满足条件(1)和(2):
    function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
    return defer(() => { // defer to start running when there's a single subscriber
    return from(fetch()).pipe(
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    );
    }).pipe(share()); // share to allow multiple subscribers
    }

    function isComplete(r: Response): boolean {
    // returns true if r is complete.
    }

    如何修改此代码以满足(3)?在当前的实现中,轮询停止了,但我如何调用 cancel ?

    最佳答案

    使用完成
    您可以使用 finalize 调用取消.这可能是这样的:

    function longPollingAction(
    fetch: () => Promise<Response>,
    cancel: () => void
    ): Observable<Response> {
    // defer to turn eager promise into lazy observable
    return defer(fetch).pipe(
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    finalize(cancel),
    share() // share to allow multiple subscribers
    );
    }

    function isComplete(r: Response): boolean {
    // returns true if r is complete.
    }
    回电 completeTap 运算符(operator)可以访问 next , error , 和 complete排放。对于 callback: () => void ,这就够了。
    function longPollingAction(
    fetch: () => Promise<Response>,
    cancel: () => void
    ): Observable<Response> {
    // defer to turn eager promise into lazy observable
    return defer(fetch).pipe(
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    tap({
    complete: cancel
    }),
    share() // share to allow multiple subscribers
    );
    }

    function isComplete(r: Response): boolean {
    // returns true if r is complete.
    }
    回电 unsubscribe我不 这样的运算符存在,但我们可以很容易地制作一个。如果取消订阅,此运算符只会触发回调。它将忽略 error , 和 complete .
    function onUnsubscribe<T>(
    fn: () => void
    ): MonoTypeOperatorFunction<T> {
    return s => new Observable(observer => {
    const bindOn = name => observer[name].bind(observer);
    const sub = s.subscribe({
    next: bindOn("next"),
    error: bindOn("error"),
    complete: bindOn("complete")
    });

    return {
    unsubscribe: () => {
    fn();
    sub.unsubscribe()
    }
    };
    });
    }
    然后你可以像这样使用它:
    function longPollingAction(
    fetch: () => Promise<Response>,
    cancel: () => void
    ): Observable<Response> {
    // defer to turn eager promise into lazy observable
    return defer(fetch).pipe(
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    onUnsubscribe(cancel),
    share() // share to allow multiple subscribers
    );
    }

    function isComplete(r: Response): boolean {
    // returns true if r is complete.
    }
    share正在管理您的订阅,分享只会取消订阅一次 refCount < 1 ,那么在这种情况下调用取消的唯一方法是没有订阅者。

    关于javascript - RxJs:当所有取消订阅时中止延迟和共享的可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66801539/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com