gpt4 book ai didi

javascript - RxJS - 对 maxWait 和 maxElements 窗口使用 windowWhen()

转载 作者:行者123 更新时间:2023-11-29 21:16:31 24 4
gpt4 key购买 nike

我正在尝试将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount() 提供,但已从 RxJS 5 中删除。

除了窗口的最后一个元素丢失外,一切都很好。说到“迷失”——这就是我现在的感受。有哪位 RxJS 大师可以告诉我我哪里做错了吗?

 private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {

// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries / maxWait (whatever comes first).
const toggleSubject = new Subject<void>();

return queue

// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)

// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}

由于 if (count > (maxEntries)) 而触发 toggleSubject.next(null) 的元素丢失了(不在任何窗口中)。

编辑: maxTime 在推送新 Observable 的第一个元素时开始计时。 如果(计数 === 1)。这是 a) 我在 map() 中的窗口 Observable 内部工作的原因和 b) 重要的原因,因为这是必需的行为。

示例:maxElements:100,maxWait:100。101 个元素在 t=99 时被推送。预期行为:在 t=99 时,一个包含 100 个元素的 Observable 被推送。剩下 1 个元素。计数器 + 定时器复位。在 t=199 时,第二个“ block ”的计数器到期并推送一个包含 1 个元素的 Observable。

(在这个例子中,Brandons(见答案)代码会——如果我没看错的话——在 t=99 时推送一个包含 100 个元素的 Observable,一毫秒后,在 t=100 时,一个 Observable有一个元素。)

最佳答案

是的,您不想使用 map 来产生这样的副作用。正如您所注意到的,您最终会掉落元素。

这是一个通用方法,我认为它可以满足您的需求。

注意:RXJS 5 目前有一个 issue使用此发布重载的类型定义。我添加了一些应该允许它在 TypeScript 中编译的类型转换。

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait
const timer = IntervalObservable.create(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);

// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}

编辑:

如果您不想在第一个项目到达每个窗口后才开始计时,那么您可以这样做:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait after the first
// item in this window arrives:
const timer = entries.take(1).delay(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);

// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}

关于javascript - RxJS - 对 maxWait 和 maxElements 窗口使用 windowWhen(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39209560/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com