gpt4 book ai didi

rxjs - takeWhile 完成后的链式 Observable 没有被调用?

转载 作者:行者123 更新时间:2023-12-01 12:19:47 28 4
gpt4 key购买 nike

我有以下应该这样调用的方法:

  • registerDomain应该被调用并且应该返回一个 operationId
  • 之后 10秒,getOperationDetail应该调用传入 operationId
  • getOperationDetail应该每 10 调用一次距离 successful 的秒数被退回。
  • 一次getOperationDetail完成,createRecordSets应该调用。
  • 最后,getChangeStatus应该被调用,直到它返回 INSYNC
  • 如果任何 api 调用抛出异常,我该如何处理客户端的错误?

  • 下面的代码调用了 registerDomain 和 getOperationDetail,但是在 getOperationDetail 完成后,它并没有移动到 createRecordSets。
      registerDomain(domain) {
    return this._adminService.registerDomain(domain)
    .concatMap(operation => this.getOperationDetail(operation.OperationId))
    .concatMap(() => this._adminService.createRecordSets(domain));
    }

    getOperationDetail(operationId) {
    return Observable.interval(10000)
    .mergeMap(() => this._adminService.getOperationDetail(operationId))
    .takeWhile((info) => info.Status.Value !== 'SUCCESSFUL');
    }
    createRecordSets(caseWebsiteUrl) {
    return this._adminService.createRecordSets(caseWebsiteUrl.Url)
    .concatMap(registerId => this.getChangeStatus(registerId));
    }

    getChangeStatus(registerId) {
    return Observable.interval(5000)
    .mergeMap(() => this._adminService.getChange(registerId))
    .takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC');
    }

    我更新了 getOperationDetail使用 first运算符(operator):
      getOperationDetail(operationId) {
    return Observable.interval(3000)
    .mergeMap(() => this._adminService.getOperationDetail(operationId))
    .first((info) => info.Status.Value === 'SUCCESSFUL')

    }

    现在它实际上调用了 createRecordSets ,但是,在 createRecordSets 之后,它继续调用 getOperationDetail大约 13 次,最终调用 getChangeStatus .我看着它的方式,我认为它会:
  • 调用 getOperationDetail直到收到 SUCCESS .
  • 调用 createRecordSets一度。
  • 调用 getChangeStatus直到收到 INSYNC
  • 完毕。

  • 为什么要额外调用?

    我将 registerDomain 更改为如下所示:
     registerDomain(domain) {
    return this._adminService.registerDomain(domain)
    .concatMap(operation => this.getOperationDetail(operation.OperationId))
    .concatMap((op) => this.createRecordSets(op));

    在我拥有 .concatMap((op) => this.createRecordSets(op)) 之前紧跟在 this.getOperationDetail 之后.一旦我把它移到外面,它就开始按预期工作。我不确定为什么。有人可以解释吗?

    最佳答案

    takeWhile满足满足指定条件的值,它完成了 observable 而不传播该值。这意味着下一个链式运算符将不会接收该值并且不会调用其回调。

    假设在您的示例中,前两次调用 this._adminService.getOperationDetail(...)导致不成功状态,第三次调用成功。这意味着 getOperationDetail() 返回的 observable只会产生两个 info值每个具有非成功状态的值。还有什么可能也很重要,下一个链式 concatMap运算符将根据每个不成功的值调用其回调,这意味着 createRecordSets()会被调用两次。我想你可能想避免这种情况。

    我建议使用 first运算符:

    getOperationDetail(operationId) {
    return Observable.interval(10000)
    .concatMap(() => this._adminService.getOperationDetail(operationId))
    .first(info => info.Status.Value !== 'SUCCESSFUL');
    }

    这边 getOperationDetail()只要 this._adminService.getOperationDetail(operationId) 将只产生一个“成功”值成功。 first 运算符发出与指定条件匹配的源 observable 的第一个值,然后完成。

    在错误处理方面, catch retry 运算符可能有用。

    更新:

    您遇到的意外行为( getOperationDetail()first() 完成后不断被调用)似乎是 bugrxjs .如 this issue 中所述,

    every take-ish operator (one that completes earlier than its source Observable), will keep subscribing to source Observable when combined with operator that prolongs subscription (here switchMap).



    两个 firsttakeWhile例如, switchMap 是此类接受操作符和“延长”订阅的操作符的示例。 , concatMapmergeMap .在下面的示例中,数字将在 concatMap 的内部可观察时保持记录。正在发射值:
    var takeish$ = Rx.Observable.interval(200)
    // will keep logging until inner observable of `concatMap` is completed
    .do(x => console.log(x))
    .takeWhile(x => x < 2);

    var source = takeish$
    .concatMap(x => Rx.Observable.interval(200).take(10))
    .subscribe();

    看起来可以通过将包含这样一个 take-ish 运算符的 observable 转换为高阶 observable 来解决这个问题——就像你所做的那样:
    var takeish$ = Rx.Observable.interval(200)
    // will log only 0, 1, 2
    .do(x => console.log(x))
    .takeWhile(x => x < 2);

    var source = Rx.Observable.of(null)
    .switchMap(() => takeish$)
    .concatMap(x => Rx.Observable.interval(200).take(1))
    .subscribe();

    更新 2:

    从 rxjs 版本 5.4.2 开始,似乎上述错误仍然存​​在。例如,它影响 first 的源 observable 是否存在。运算符将在 first 时取消订阅满足规定条件。当 first运算符后紧跟 concatMap ,它的源 observable 不会被取消订阅,并且会一直发射值直到 concatMap 的内部 observable完成。在您的情况下,这意味着 this._adminService.getOperationDetail()会一直被调用,直到 createRecordSets() 返回 observable会完成的。

    这是您的示例简化以说明行为:

    function registerDomain() {
    return Rx.Observable.of("operation")
    .concatMap(() => getOperationDetail()
    .concatMap(() => Rx.Observable.interval(200).take(5)));
    }

    function getOperationDetail() {
    return Rx.Observable.interval(100)
    // console.log() instead of the actual service call
    .do(x => console.log(x))
    .first(x => x === 2);
    }

    registerDomain().subscribe();
    <script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>


    如果我们扩展第一个 concatMap 的内部 observable运算符,我们将得到以下 observable:
    Rx.Observable.interval(100)
    .do(x => console.log(x))
    .first(x => x === 2)
    .concatMap(() => Rx.Observable.interval(200).take(5));

    请注意 first紧随其后的是 concatMap这会阻止 first 的可观察源运算符(operator)(即 interval(100).do(x => console.log(x) )被取消订阅。值将继续记录(或者在您的情况下,将继续发送服务调用)直到 concatMap 的内部可观察值(即 interval(200).take(5) )完成。

    如果我们修改上面的例子并移动第二个 concatMap从第一个 concatMap 的内部 observable , first将不再与它链接,并且将在满足条件后立即取消订阅源 observable,这意味着该间隔将停止发出值并且不会记录更多数字(或不会发送更多服务请求):

    function registerDomain() {
    return Rx.Observable.of("operation")
    .concatMap(() => getOperationDetail())
    .concatMap(() => Rx.Observable.interval(200).take(5));
    }

    function getOperationDetail() {
    return Rx.Observable.interval(100)
    // console.log() instead of the actual service call
    .do(x => console.log(x))
    .first(x => x === 2);
    }

    registerDomain().subscribe();
    <script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>


    在这种情况下,内部 observable 可以简单地扩展为:
    Rx.Observable.interval(100)
    .do(x => console.log(x))
    .first(x => x === 2)

    请注意 first不再跟随 concatMap .

    还值得一提的是,在这两种情况下, registerDomain() 都返回了 observable。产生完全相同的值,如果我们从 do() 移动日志记录运算符到 subscribe() ,在这两种情况下,相同的值将被写入控制台:
    registerDomain.subscribe(x => console.log(x));

    关于rxjs - takeWhile 完成后的链式 Observable 没有被调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45312781/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com