gpt4 book ai didi

javascript - 具有原始顺序的 RxJS mergeMap()

转载 作者:塔克拉玛干 更新时间:2023-11-02 21:08:18 24 4
gpt4 key购买 nike

抽象问题

有没有什么方法可以按照外部可观察对象的原始顺序使用 mergeMap 的结果,同时仍然允许内部可观察对象并行运行?


更详细的解释

让我们看一下两个合并映射运算符:

  • mergeMap

    ...它需要一个映射回调,以及可以并发运行的内部可观察量的数量:

      of(1, 2, 3, 4, 5, 6).pipe(
    mergeMap(number => api.get('/double', { number }), 3)
    );

    在此处查看实际操作:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010

    这将分别为 123 触发 3 个并行请求。一旦其中一个请求完成,它就会触发另一个对 4 的请求。依此类推,始终保持 3 个并发请求,直到处理完所有值。

    但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能是乱序的。所以不是:

      [2, 4, 6, 8, 10, 12]

    ...我们实际上可能得​​到:

      [4, 2, 8, 10, 6, 12] // or any other permutation
  • concatMap

    ...输入concatMap。此运算符确保可观察对象全部按原始顺序连接,以便:

      of(1, 2, 3, 4, 5, 6).pipe(
    concatMap(number => api.get('/double', { number }))
    );

    ...将始终产生:

      [2, 4, 6, 8, 10, 12]

    在此处查看实际操作:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010

    这就是我们想要的,但现在请求不会并行运行。作为the documentation说:

    concatMap is equivalent to mergeMap with concurrency parameter set to 1.

回到问题:是否有可能获得 mergeMap 的好处,从而可以并行运行给定数量的请求,同时仍然具有映射值以原始顺序发出?


我的具体问题

以上是对问题的抽象描述。当您知道手头的实际问题时,有时更容易推理问题,所以这里是:

  1. 我有一份必须发货的订单 list :

     const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  2. 我有一个实际运送订单的 shipOrder 方法。它返回一个 Promise:

     const shipOrder = orderNumber => api.shipOrder(orderNumber);
  3. API 最多只能同时处理 5 个订单发货,所以我使用 mergeMap 来处理:

     from(orderNumbers).pipe(
    mergeMap(orderNumber => shipOrder(orderNumber), 5)
    );
  4. 订单发货后,我们需要打印其发货标签。我有一个 printShippingLabel 函数,根据已发货订单的订单号,该函数将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:

     from(orderNumbers)
    .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
    .pipe(orderNumber => printShippingLabel(orderNumber));
  5. 这行得通,但是现在运输标签打印顺序不对,因为 mergeMap 根据 shipOrder 完成的时间发出值它的请求。我想要的是按照与原始列表相同的顺序打印标签

这可能吗?


可视化

请参阅此处了解问题:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010

您可以看到,较早的订单甚至在较晚的订单发货之前就已开始打印。

最佳答案

我确实设法部分解决了它,所以我将它张贴在这里作为我自己问题的答案。

我仍然非常想知道处理这种情况的规范方法。


复杂的解决方案

  1. 创建一个自定义运算符,它采用具有索引键的值({ index: number } Typescript 说法),并保留值的缓冲区,仅根据它们的 index 顺序发出它们。

  2. 将原始列表映射到嵌入了索引的对象列表。

  3. 将其传递给我们的自定义 sortByIndex 运算符。

  4. 将值映射回它们的原始值。

这是 sortByIndex 的样子:

function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);

while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}

有了 sortByIndex 运算符,我们现在可以完成整个管道:

of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);

在此处查看实际操作:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010

创建一个concurrentConcat操作符

事实上,有了这个 sortByIndex 运算符,我们现在可以创建一个通用的 concurrentConcat 运算符,它将对 { 索引进行转换: number, value: T 内部类型:

function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}

然后我们可以使用此 concurrentConcat 运算符代替 mergeMap,它现在将按其原始顺序发出值:

of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);

在此处查看实际操作:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010

所以要解决我最初的订单发货问题:

from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));

在此处查看实际操作:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

您可以看到,即使较晚的订单最终可能会先于较早的订单发货,但标签始终按原始订单打印。


结论

这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此普遍的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|

关于javascript - 具有原始顺序的 RxJS mergeMap(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57045892/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com