gpt4 book ai didi

javascript - mergeMap 运算符的用例是什么?

转载 作者:IT王子 更新时间:2023-10-29 03:04:55 24 4
gpt4 key购买 nike

我完全不明白 mergeMap 的用途。我听说有两种解释:

  1. 这就像 .NET LINQ 中的 SelectAll() - 不。
  2. 它是 RxJS mergemap 的组合 - 不(或者我无法复制它)。

考虑 following code :

    var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);

//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)

//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)

最后一 block 标有“Who knows what”的东西只不过是 obs1 上的 map - 有什么意义?

mergeMap 实际上做了什么?什么是有效用例的示例? (最好有一些代码)

对我没有任何帮助的文章(上面的 mergeMap 代码来自其中之一):1 , 2

最佳答案

tl;dr; mergeMapmap 更强大。了解 mergeMap 是获得 Rx 全部功能的必要条件。


相似点

  • mergeMapmap 都作用于单个流(相对于 zipcombineLatest)

  • mergeMapmap 都可以转换流的元素(相对于 filterdelay )

差异

map

  • 不能改变源流的大小(假设:map 本身不throw);对于源中的每个元素,只发出一个 mapped 元素; map 不能忽略元素(例如 filter);

  • 在默认调度程序的情况下,转换同步发生;要 100% 清楚:源流可能会异步传送其元素,但每个下一个元素都会立即被 mapped 并进一步重新发出; map 无法及时移动元素,例如 delay

  • 返回值没有限制

  • id: x => x

合并 map

  • 可以改变源流的大小;对于每个元素,可能会创建/发出任意数量(0、1 或许多)的新元素

  • 它提供了对异步性的完全控制——无论是何时创建/发出新元素,还是应同时处理源流中的多少元素;例如,假设源流发射了 10 个元素,但是 maxConcurrency 设置为 2,那么前两个元素将被立即处理,其余 8 个元素被缓冲;一旦处理过的 completed 中的一个元素将被处理,源流中的下一个元素将被处理等等 - 这有点棘手,但请看下面的示例

  • 所有其他运算符都可以仅使用 mergeMapObservable 构造函数实现

  • 可用于递归异步操作

  • 返回值必须是 Observable 类型(或者 Rx 必须知道如何从中创建 observable - 例如 promise、array)

  • id: x => Rx.Observable.of(x)

数组类比

let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]

这个类比并没有显示全貌,它基本上对应于 .mergeMap 并且 maxConcurrency 设置为 1。在这种情况下,元素将按上述顺序排列,但在一般情况下不必如此。我们唯一的保证是新元素的发射将按照它们在底层流中的位置排序。例如:[3,1,2,4,9,1][2,3,1,1,9,4] 是有效的,但是 [1,1,4,2,3,9] 不是(因为 4 在基础流中的 2 之后发出)。

几个使用 mergeMap 的例子:

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}

Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))

const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()

setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

关于javascript - mergeMap 运算符的用例是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42120680/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com