gpt4 book ai didi

javascript - 如何一次处理 RxJS 流 n 个项目,并在完成一个项目后,再次自动填充回 n 个?

转载 作者:行者123 更新时间:2023-11-29 19:11:25 26 4
gpt4 key购买 nike

我有一个事件流,我想调用一个函数为每个事件返回一个 promise ,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件.

这个卵石图可能是错误的,但这是我想要的:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done

---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES

这是我目前的代码:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

这段代码有两个问题:

  1. 它正在使用 inProgressCount 变量,它随 side 更新效果函数。
  2. 仅当我从受控流中请求超过 1 个项目时,done$ 订阅才会被调用一次。这使得 inProgressCount var 更新不正确,这最终将队列一次限制为一个。

你可以看到它在这里工作: http://jsbin.com/wivehonifi/1/edit?js,console,output

问题:

  1. 有更好的方法吗?
  2. 如何去掉 inProgressCount 变量?
  3. 为什么 done$ 订阅在请求多个项目时只被调用一次?

更新:
问题 #3 的答案:switchMap 与 flatMapLatest 相同,所以这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。

最佳答案

您实际上根本不需要使用背压。有一个名为 flatMapWithMaxConcurrent 的运算符可以为您执行此操作。它本质上是调用 .map().merge(concurrency) 的别名,并且一次只允许最大数量的流在运行。

我在这里更新了你的 jsbin:http://jsbin.com/weheyuceke/1/edit?js,output

但我在下面注释了重要的一点:

const concurrency = 4;

var done$ = events$
//Only allows a maximum number of items to be subscribed to at a time
.flatMapWithMaxConcurrent(concurrency,
({timestamp}) =>
//This overload of `fromPromise` defers the execution of the lambda
//until subscription
Rx.Observable.fromPromise(() => {
//Notify the ui that this task is in progress
updatePanelAppend(inProgress, timestamp);
removeFromPanel(pending, timestamp);
//return the task
return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
}));

关于javascript - 如何一次处理 RxJS 流 n 个项目,并在完成一个项目后,再次自动填充回 n 个?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38601451/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com