gpt4 book ai didi

angular - 如何正确地 forkJoin 可观察对象列表或其订阅?

转载 作者:行者123 更新时间:2023-12-02 17:56:56 27 4
gpt4 key购买 nike

我有一个非常大的有序可观察对象列表,需要并行运行。当每个可观察对象返回时,它们将结果附加到行为主题,这就是结果传递的方式。但是,我需要在它们全部完成时调用一个特定的函数。

每个 observables 从 API 下载图像(和相关元数据)。请求需要尽可能快地执行,我需要处理每个结果,因为它发出并在所有可观察对象完成时发出一个空值。这意味着 observables 应该并行执行。

原始实现,完成时没有回调。

const requests: Observable[] = getRequests();

requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));

为了在所有请求完成时实现回调,我尝试了以下方法。

const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();

requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));

forkJoin(requests).subscribe(() => {
finishedTracker.next();
finishedTracker.complete();
console.log('requests done');
});

这可行,但对我来说似乎很奇怪,我需要拆分请求的 forkJoin 和订阅。有没有更好的方法来实现这个功能?我也查看了 mergeMap,但无法使其正常工作。

编辑根据评论,我意识到订阅两次意味着发出两次请求。因此我尝试了另一种实现方式。

from(requests).pipe(
mergeMap(o => {
o.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}
return o;
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})

我没有使用 forkJoin 的结果,因为据我所知,它给了我所有请求的结果。因此需要等待他们完成。由于每个请求都比较快,但通常有数百个请求,所以我需要在每个请求完成后立即将它们各自的结果传递给行为主体。

编辑 2 我采用的解决方案。

from(requests).pipe(
mergeMap(request => request, 10),
scan<ImageResponse, ImageResponse[]>((all, current, index) => {
all = all.concat(current);
this.$images.next(all);
return all;
}, [])
).subscribe({
complete: () => {
finishedTracker.next();
console.log('requests done');
}});

最佳答案

没有必要在您的 mergeMap 中订阅。事实上,正如其他人所指出的那样,它导致了双重订阅,因为 mergeMap 在内部订阅了您传递给它的函数返回的可观察值。

要在响应发生时对其进行处理,您只需使用管道并在其中添加处理逻辑即可。由于您本质上是在产生副作用(不会修改当前流的输出),因此使用 tap运营商是合适的:

from(requests).pipe(
mergeMap(o => o.pipe(
tap(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}),
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})

虽然这会奏效,但看起来您将可观察的流程复杂化了。我不确定您的用例,但我猜根本不需要主题。如果您的目标是在处理结果时发出累积结果数组,您可以使用 scan为此,不涉及任何 SubjectBehaviorSubject。要在所有请求完成后执行一些逻辑,您可以传递部分 Observer ,它仅指定 complete 回调(而不是 next 回调,当您将函数作为参数传递给 subscribe()):

from(requests).pipe(
mergeMap(request => request, 10),
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
});

编辑:正如@AdrianBrand 所指出的,使用 merge 更简洁而不是 from/mergeMap:

merge(...requests, 10).pipe(
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
})

关于angular - 如何正确地 forkJoin 可观察对象列表或其订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75433820/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com