- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我完全不明白 mergeMap
的用途。我听说有两种解释:
SelectAll()
- 不。merge
和 map
的组合 - 不(或者我无法复制它)。考虑 following code :
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
最后一 block 标有“Who knows what”的东西只不过是 obs1
上的 map - 有什么意义?
mergeMap
实际上做了什么?什么是有效用例的示例? (最好有一些代码)
最佳答案
tl;dr; mergeMap
比 map
更强大。了解 mergeMap
是获得 Rx 全部功能的必要条件。
mergeMap
和 map
都作用于单个流(相对于 zip
、combineLatest
)
mergeMap
和 map
都可以转换流的元素(相对于 filter
、delay
)
不能改变源流的大小(假设:map
本身不throw
);对于源中的每个元素,只发出一个 mapped
元素; map
不能忽略元素(例如 filter
);
在默认调度程序的情况下,转换同步发生;要 100% 清楚:源流可能会异步传送其元素,但每个下一个元素都会立即被 mapped
并进一步重新发出; map
无法及时移动元素,例如 delay
返回值没有限制
id
: x => x
可以改变源流的大小;对于每个元素,可能会创建/发出任意数量(0、1 或许多)的新元素
它提供了对异步性的完全控制——无论是何时创建/发出新元素,还是应同时处理源流中的多少元素;例如,假设源流发射了 10 个元素,但是 maxConcurrency
设置为 2,那么前两个元素将被立即处理,其余 8 个元素被缓冲;一旦处理过的 complete
d 中的一个元素将被处理,源流中的下一个元素将被处理等等 - 这有点棘手,但请看下面的示例
所有其他运算符都可以仅使用 mergeMap
和 Observable
构造函数实现
可用于递归异步操作
返回值必须是 Observable 类型(或者 Rx 必须知道如何从中创建 observable - 例如 promise、array)
id
: x => Rx.Observable.of(x)
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
这个类比并没有显示全貌,它基本上对应于 .mergeMap
并且 maxConcurrency
设置为 1。在这种情况下,元素将按上述顺序排列,但在一般情况下不必如此。我们唯一的保证是新元素的发射将按照它们在底层流中的位置排序。例如:[3,1,2,4,9,1]
和 [2,3,1,1,9,4]
是有效的,但是 [1,1,4,2,3,9]
不是(因为 4
在基础流中的 2
之后发出)。
mergeMap
的例子:// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>
关于javascript - mergeMap 运算符的用例是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42120680/
在我的情况下,首先可以并行执行多个请求,在这些请求完成后,另一个请求将与先前的结果一起发送,伪代码看起来像 let uploads$ = [obs1$, obs2$, obs3$]; Observab
我正在尝试使用 mergeMap在 rxjs6我收到了这个错误: Property 'mergeMap' does not exist on type 'Observable' 我试过了import
我完全不明白 mergeMap 的用途。我听说有两种解释: 这就像 .NET LINQ 中的 SelectAll() - 不。 它是 RxJS merge 和 map 的组合 - 不(或者我无法复制它
我有一个 Observable,其中每个新值都应该引起一个 HTTP 请求。在客户端,我只关心最新的响应值;但是,我希望每个请求都能完成以进行监控/等。目的。 我目前拥有的是这样的: function
遵循 Stack Overflow 帖子中描述的最佳实践:Angular RxJS When Should I Unsubscribe From Subscription ,考虑这个 Angular
当我使用 Angular HttpClient 发出 GET 请求时,我会得到一个 observable 并在 RxJS 运算符 mergeMap 中处理它。 现在它一次又一次地抛出 404,我想捕获
我有一个可观察的流。第一个运算符是一个 mergeMap,它返回一个可观察值数组。然后我必须有第二个 mergeMap 来从第一个 mergeMap 的返回中获取最终值。我觉得第二个 mergeMap
我有一个可观察的流。第一个运算符是一个 mergeMap,它返回一个可观察值数组。然后我必须有第二个 mergeMap 来从第一个 mergeMap 的返回中获取最终值。我觉得第二个 mergeMap
抽象问题 有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap 的结果,同时仍然允许内部可观察对象并行运行? 更详细的解释 让我们看一下两个合并映射运算符: mergeMap ...它
尝试使用 http.get 请求 URL 将经度和纬度从地理位置服务提取到 list.service。 第一期: businesses Property 'businesses' does not e
TLDR:工作示例在这个问题的最后一个代码块中。查看 @bryan60 的答案,了解使用 concat 而不是 mergeMap 的工作示例。 我试图按顺序运行多个远程请求,但只执行了第一个可观察对象
我有两个相同效果的实现,并且都有效。我很难理解两者之间的差异,哪个更“正确”。 请在下面找到它们: 选项 1. IDE 无法确定 instance 的类型在最后 map . pollingSt
在此 mergeMap 的大理石图中, 你怎么读这个表达式? // Kind of, looks like an em-dash near the end mergeMap(i => 10*i--10
在 rxjs5 文档中,它提到“为了减少多态性并从运算符中获得更好的性能,一些运算符已被拆分为多个运算符”。它的实际含义是什么以及如何使用 mergeMapTo 运算符? 最佳答案 来自docs ,m
我试图理解为什么 Rxjs 的 mergeMap 在传入身份函数时会变平 of(['click', 'tap']) .pipe( mergeMap(_ => _), tap
我试图理解为什么 Rxjs 的 mergeMap 在传入身份函数时会变平 of(['click', 'tap']) .pipe( mergeMap(_ => _), tap
我看了一个关于如何在 2 个可观察对象上使用 mergeMap() 的教程,但我仍然不清楚如何将它与超过 2 个可观察对象一起使用。 教程链接如下: https://www.youtube.com/w
在 api 调用获取数据后,我需要在 FETCH_DATA_SUCCESS 上调用一些进一步的操作。 'RESET_IMAGE_DATA' 和 'INITIALISE_FILTERS' 操作必须在每次
我需要你的帮助:) 我使用了 angular 7 和 Rxjs。为了测试,我使用了 Jest。 我的组件在 2 个 httpClient 之间使用 MergeMap,就像这样: addPost() {
如何将 mergeMap 操作排入队列,以便只有在前一个操作完成后才执行一个又一个操作? 最佳答案 您需要使用 concatMap,而不是 mergeMap。 关于javascript - RxJs
我是一名优秀的程序员,十分优秀!