- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap
的结果,同时仍然允许内部可观察对象并行运行?
让我们看一下两个合并映射运算符:
...它需要一个映射回调,以及可以并发运行的内部可观察量的数量:
of(1, 2, 3, 4, 5, 6).pipe(
mergeMap(number => api.get('/double', { number }), 3)
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010
这将分别为 1
、2
和 3
触发 3 个并行请求。一旦其中一个请求完成,它就会触发另一个对 4
的请求。依此类推,始终保持 3 个并发请求,直到处理完所有值。
但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能是乱序的。所以不是:
[2, 4, 6, 8, 10, 12]
...我们实际上可能得到:
[4, 2, 8, 10, 6, 12] // or any other permutation
...输入concatMap
。此运算符确保可观察对象全部按原始顺序连接,以便:
of(1, 2, 3, 4, 5, 6).pipe(
concatMap(number => api.get('/double', { number }))
);
...将始终产生:
[2, 4, 6, 8, 10, 12]
在此处查看实际操作:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010
这就是我们想要的,但现在请求不会并行运行。作为the documentation说:
concatMap
is equivalent tomergeMap
withconcurrency
parameter set to1
.
回到问题:是否有可能获得 mergeMap
的好处,从而可以并行运行给定数量的请求,同时仍然具有映射值以原始顺序发出?
以上是对问题的抽象描述。当您知道手头的实际问题时,有时更容易推理问题,所以这里是:
我有一份必须发货的订单 list :
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
我有一个实际运送订单的 shipOrder
方法。它返回一个 Promise
:
const shipOrder = orderNumber => api.shipOrder(orderNumber);
API 最多只能同时处理 5 个订单发货,所以我使用 mergeMap
来处理:
from(orderNumbers).pipe(
mergeMap(orderNumber => shipOrder(orderNumber), 5)
);
订单发货后,我们需要打印其发货标签。我有一个 printShippingLabel
函数,根据已发货订单的订单号,该函数将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:
from(orderNumbers)
.pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
.pipe(orderNumber => printShippingLabel(orderNumber));
这行得通,但是现在运输标签打印顺序不对,因为 mergeMap
根据 shipOrder
完成的时间发出值它的请求。我想要的是按照与原始列表相同的顺序打印标签。
这可能吗?
请参阅此处了解问题:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,较早的订单甚至在较晚的订单发货之前就已开始打印。
最佳答案
我确实设法部分解决了它,所以我将它张贴在这里作为我自己问题的答案。
我仍然非常想知道处理这种情况的规范方法。
创建一个自定义运算符,它采用具有索引键的值({ index: number }
Typescript 说法),并保留值的缓冲区,仅根据它们的 index
顺序发出它们。
将原始列表映射到嵌入了索引
的对象列表。
将其传递给我们的自定义 sortByIndex
运算符。
将值映射回它们的原始值。
这是 sortByIndex
的样子:
function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);
while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}
有了 sortByIndex
运算符,我们现在可以完成整个管道:
of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010
concurrentConcat
操作符事实上,有了这个 sortByIndex
运算符,我们现在可以创建一个通用的 concurrentConcat
运算符,它将对 { 索引进行转换: number, value: T
内部类型:
function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}
然后我们可以使用此 concurrentConcat
运算符代替 mergeMap
,它现在将按其原始顺序发出值:
of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010
所以要解决我最初的订单发货问题:
from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));
在此处查看实际操作:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010
您可以看到,即使较晚的订单最终可能会先于较早的订单发货,但标签始终按原始订单打印。
这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此普遍的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|
关于javascript - 具有原始顺序的 RxJS mergeMap(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57045892/
在我的情况下,首先可以并行执行多个请求,在这些请求完成后,另一个请求将与先前的结果一起发送,伪代码看起来像 let uploads$ = [obs1$, obs2$, obs3$]; Observab
我正在尝试使用 mergeMap在 rxjs6我收到了这个错误: Property 'mergeMap' does not exist on type 'Observable' 我试过了import
我完全不明白 mergeMap 的用途。我听说有两种解释: 这就像 .NET LINQ 中的 SelectAll() - 不。 它是 RxJS merge 和 map 的组合 - 不(或者我无法复制它
我有一个 Observable,其中每个新值都应该引起一个 HTTP 请求。在客户端,我只关心最新的响应值;但是,我希望每个请求都能完成以进行监控/等。目的。 我目前拥有的是这样的: function
遵循 Stack Overflow 帖子中描述的最佳实践:Angular RxJS When Should I Unsubscribe From Subscription ,考虑这个 Angular
当我使用 Angular HttpClient 发出 GET 请求时,我会得到一个 observable 并在 RxJS 运算符 mergeMap 中处理它。 现在它一次又一次地抛出 404,我想捕获
我有一个可观察的流。第一个运算符是一个 mergeMap,它返回一个可观察值数组。然后我必须有第二个 mergeMap 来从第一个 mergeMap 的返回中获取最终值。我觉得第二个 mergeMap
我有一个可观察的流。第一个运算符是一个 mergeMap,它返回一个可观察值数组。然后我必须有第二个 mergeMap 来从第一个 mergeMap 的返回中获取最终值。我觉得第二个 mergeMap
抽象问题 有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap 的结果,同时仍然允许内部可观察对象并行运行? 更详细的解释 让我们看一下两个合并映射运算符: mergeMap ...它
尝试使用 http.get 请求 URL 将经度和纬度从地理位置服务提取到 list.service。 第一期: businesses Property 'businesses' does not e
TLDR:工作示例在这个问题的最后一个代码块中。查看 @bryan60 的答案,了解使用 concat 而不是 mergeMap 的工作示例。 我试图按顺序运行多个远程请求,但只执行了第一个可观察对象
我有两个相同效果的实现,并且都有效。我很难理解两者之间的差异,哪个更“正确”。 请在下面找到它们: 选项 1. IDE 无法确定 instance 的类型在最后 map . pollingSt
在此 mergeMap 的大理石图中, 你怎么读这个表达式? // Kind of, looks like an em-dash near the end mergeMap(i => 10*i--10
在 rxjs5 文档中,它提到“为了减少多态性并从运算符中获得更好的性能,一些运算符已被拆分为多个运算符”。它的实际含义是什么以及如何使用 mergeMapTo 运算符? 最佳答案 来自docs ,m
我试图理解为什么 Rxjs 的 mergeMap 在传入身份函数时会变平 of(['click', 'tap']) .pipe( mergeMap(_ => _), tap
我试图理解为什么 Rxjs 的 mergeMap 在传入身份函数时会变平 of(['click', 'tap']) .pipe( mergeMap(_ => _), tap
我看了一个关于如何在 2 个可观察对象上使用 mergeMap() 的教程,但我仍然不清楚如何将它与超过 2 个可观察对象一起使用。 教程链接如下: https://www.youtube.com/w
在 api 调用获取数据后,我需要在 FETCH_DATA_SUCCESS 上调用一些进一步的操作。 'RESET_IMAGE_DATA' 和 'INITIALISE_FILTERS' 操作必须在每次
我需要你的帮助:) 我使用了 angular 7 和 Rxjs。为了测试,我使用了 Jest。 我的组件在 2 个 httpClient 之间使用 MergeMap,就像这样: addPost() {
如何将 mergeMap 操作排入队列,以便只有在前一个操作完成后才执行一个又一个操作? 最佳答案 您需要使用 concatMap,而不是 mergeMap。 关于javascript - RxJs
我是一名优秀的程序员,十分优秀!