gpt4 book ai didi

javascript - RXJS中异步流是如何传输的?

转载 作者:搜寻专家 更新时间:2023-11-01 04:29:26 25 4
gpt4 key购买 nike

我想了解流是如何通过 RXjs 中的管道传输的。
我知道这不应该是一个问题,因为这是异步流的全部想法 - 但仍然有一些我想了解的东西。

查看这段代码:

var source = Rx.Observable
.range(1, 3)
.flatMapLatest(function (x) { //`switch` these days...
return Rx.Observable.range(x*100, 2);
});


source.subscribe(value => console.log('I got a value ', value))

结果:

I got a value 100
I got a value 200
I got a value 300
I got a value 301

我相信 (IIUC) 图表是这样的:(通知已取消订阅的 101,201)

----1---------2------------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------

问题是:

问题:

是否始终保证 2 会在 (101) 之前到达?与 3 在 (201) 之前到达的一样吗?

我的意思是 - 如果我不打算查看时间线,那么出现下图是完全合法的:

----1---------------2---------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------

2 略有延迟到达的地方,101 已经发出

我在这里错过了什么?管道在这里是如何工作的?

最佳答案

对于具有特定 RxJS 版本的特定 Observable 链,发射顺序将始终相同。

如前所述,在 RxJS 4 中,它使用 currentThread 调度程序,如您所见:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39 .
所有调度器(除了来自 RxJS 4 的 immediate)都是 internally using some type of a queue所以顺序总是一样的。

事件的顺序与您在图表中显示的非常相似(...或者至少我认为是这样):

  1. 1 已安排并发出,因为它是队列中的唯一操作。
  2. 100 已安排。此时调度程序的队列中没有更多操作,因为 2 尚未安排。 RangeObservable schedules another emission recursively after it calls onNext() .这意味着 100 排在 2 之前。
  3. 2 is scheduled .
  4. 100 已发出,101 已安排
  5. 2 被发出,101 被释放。
  6. ...等等

请注意,此行为在 RxJS 4 和 RxJS 5 中有所不同。

在 RxJS 5 中,大多数 Observables 和运算符默认情况下不使用任何调度器(一个明显的异常(exception)是需要处理延迟的 Observables/运算符)。所以在 RxJS 5 the RangeObservable won't schedule anything并在循环中立即开始发射值。

相同的例子在 RxJS 5 中会产生不同的结果:

const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2);
});

source.subscribe(value => console.log('I got a value ', value));

这将打印以下内容:

I got a value  100
I got a value 101
I got a value 200
I got a value 201
I got a value 300
I got a value 301

但是,如果您添加例如 delay(0),这将发生显着变化。常识表明这不应该做任何事情:

const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2).delay(0);
});

source.subscribe(value => console.log('I got a value ', value));

现在只有内部的 RangeObservable 被重新安排和处理了几次,这使得它只发出来自最后一个 RangeObservable 的值:

I got a value  300
I got a value 301

关于javascript - RXJS中异步流是如何传输的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43419163/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com