- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap
的结果,同时仍然允许内部可观察对象并行运行?
让我们看一下两个合并映射运算符:
...它需要一个映射回调,以及可以并发运行的内部可观察量的数量:
of(1, 2, 3, 4, 5, 6).pipe(
mergeMap(number => api.get('/double', { number }), 3)
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010
这将分别为 1
、2
和 3
触发 3 个并行请求。一旦其中一个请求完成,它就会触发另一个对 4
的请求。依此类推,始终保持 3 个并发请求,直到处理完所有值。
但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能是乱序的。所以不是:
[2, 4, 6, 8, 10, 12]
...我们实际上可能得到:
[4, 2, 8, 10, 6, 12] // or any other permutation
...输入concatMap
。此运算符确保可观察对象全部按原始顺序连接,以便:
of(1, 2, 3, 4, 5, 6).pipe(
concatMap(number => api.get('/double', { number }))
);
...将始终产生:
[2, 4, 6, 8, 10, 12]
在此处查看实际操作:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010
这就是我们想要的,但现在请求不会并行运行。作为the documentation说:
concatMap
is equivalent tomergeMap
withconcurrency
parameter set to1
.
回到问题:是否有可能获得 mergeMap
的好处,从而可以并行运行给定数量的请求,同时仍然具有映射值以原始顺序发出?
以上是对问题的抽象描述。当您知道手头的实际问题时,有时更容易推理问题,所以这里是:
我有一份必须发货的订单 list :
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
我有一个实际运送订单的 shipOrder
方法。它返回一个 Promise
:
const shipOrder = orderNumber => api.shipOrder(orderNumber);
API 最多只能同时处理 5 个订单发货,所以我使用 mergeMap
来处理:
from(orderNumbers).pipe(
mergeMap(orderNumber => shipOrder(orderNumber), 5)
);
订单发货后,我们需要打印其发货标签。我有一个 printShippingLabel
函数,根据已发货订单的订单号,该函数将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:
from(orderNumbers)
.pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
.pipe(orderNumber => printShippingLabel(orderNumber));
这行得通,但是现在运输标签打印顺序不对,因为 mergeMap
根据 shipOrder
完成的时间发出值它的请求。我想要的是按照与原始列表相同的顺序打印标签。
这可能吗?
请参阅此处了解问题:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,较早的订单甚至在较晚的订单发货之前就已开始打印。
最佳答案
我确实设法部分解决了它,所以我将它张贴在这里作为我自己问题的答案。
我仍然非常想知道处理这种情况的规范方法。
创建一个自定义运算符,它采用具有索引键的值({ index: number }
Typescript 说法),并保留值的缓冲区,仅根据它们的 index
顺序发出它们。
将原始列表映射到嵌入了索引
的对象列表。
将其传递给我们的自定义 sortByIndex
运算符。
将值映射回它们的原始值。
这是 sortByIndex
的样子:
function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);
while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}
有了 sortByIndex
运算符,我们现在可以完成整个管道:
of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010
concurrentConcat
操作符事实上,有了这个 sortByIndex
运算符,我们现在可以创建一个通用的 concurrentConcat
运算符,它将对 { 索引进行转换: number, value: T
内部类型:
function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}
然后我们可以使用此 concurrentConcat
运算符代替 mergeMap
,它现在将按其原始顺序发出值:
of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);
在此处查看实际操作:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010
所以要解决我最初的订单发货问题:
from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));
在此处查看实际操作:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010
您可以看到,即使较晚的订单最终可能会先于较早的订单发货,但标签始终按原始订单打印。
这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此普遍的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|
关于javascript - 具有原始顺序的 RxJS mergeMap(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57045892/
RxJS 中是否有一个运算符可以让我缓冲项目并在信号可观察对象触发时将它们一个一个地放出?有点像bufferWhen,但不是在每个信号上转储整个缓冲区,而是每个信号转储一定数量。它甚至可以转储信号 o
我正在像这样创建我的可观察源(每 5 秒调用一次 api): const obs$ = Observable.interval(5000).switchMap(() => makeApiCall())
我有一个 Action ,然后将触发一个ajax请求。 如果由于某种原因操作失败,我什么也不想做。我没有执行可以执行的无操作功能,而不是创建只返回先前状态的空白操作? export default f
在以下代码中:- RxJS.Observable.of(1,2).first().subscribe((x) => console.log(x);); 给定运营商 first() 是否有必要取消订阅?
我有一种情况,可以在很短的时间内将很多事件发送到流中。我想要一个运算符,它是ojit_code和debounceTime的混合体。 以下演示可用于说明我想拥有的https://stackblitz
我的用例如下:我得到事件,有时会突然发生。如果发生突发,我只需要处理一次即可。去抖动会执行此操作。 但是,去抖动仅给我提供连拍的最后一个元素,但我需要了解连拍中的所有元素才能汇总(使用平面图)。 这可
简化以下代码示例的方法是什么? 我找不到合适的运算符..有人可以举一个简短的例子吗? this.returnsObservable1(...) .subscribe( success =>
在RxJS 6中,如何导入静态合并功能以合并Observable列表? 我希望能够做到: const merged$ = merge( obs1$, obs2$, obs3$
我正在阅读 RxJS 的官方文档,然后我意识到它们都在做完全相同的事情。 对我来说,它们看起来完全相似。 如果有区别请指出。 最佳答案 我将根据它们的 Time 版本来描述它们之间的区别,因为这是我最
我对基本的 RxJS 概念有点熟悉,比如 Observables、Observers 和 Subjects,但是 RxJS Notifications概念对我来说是全新的。 它有什么用?我应该什么时候
从 rxjs 6.5 切换到 rxjs 7 后,我遇到了这个奇怪的错误。我不确定这是 rxjs 7 的类型问题还是 stackblitz ( https://stackblitz.com/edit/r
以前我只能使用此代码导入使用过的运算符: import 'rxjs/Observable'; import 'rxjs/add/operator/map'; import 'rxjs/add/oper
combineLatest 函数可以从 rxjs 和 rxjs/operators 导入。 当我从 rxjs/operators 导入它时(就像我导入 combineAll 我收到以下错误: TS23
我有一系列事件通过 fromEventPattern 进行像这样: fromEventPattern(addEventHandler).subscribe(ps$); 由于业务怪癖,我预计有时会抛出异
我是 rxjs 的新手,无法解决这个问题: 我有两个流: 一个有传入的对象 ---a----b----c----d-----> 一个是从列表中选择的对象 ----------------c---->
如果一个 observable 完成,我是否仍然需要取消订阅/处置(在 RxJS 中)该 observable 以删除 Observer(防止内存泄漏),或者一旦 onComplete 或 onErr
我有这样的订阅: this.test.subscribe(params => { ...some code }); 如果我传递回调函数而不是箭头函数,则缺少上下文。 我想将上下文绑定(bind)到
我有一个可观察的: messages: string[] = ['a', 'b', 'c']; const source = from(messages) 你如何延迟它,所以当有人订阅它时,它
我可以让 observable 触发一次该值。但我希望它在变量的值发生变化时发生。实际上我需要一个观察者。这就是我认为 observable 的意义所在。观察事物的值(value)或状态并更新订阅它的
我有以下代码: const fetchBook = (bookId: number) => { const title = 'Book' + bookId; console.log('
我是一名优秀的程序员,十分优秀!