- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个非常大的有序可观察对象列表,需要并行运行。当每个可观察对象返回时,它们将结果附加到行为主题,这就是结果传递的方式。但是,我需要在它们全部完成时调用一个特定的函数。
每个 observables 从 API 下载图像(和相关元数据)。请求需要尽可能快地执行,我需要处理每个结果,因为它发出并在所有可观察对象完成时发出一个空值。这意味着 observables 应该并行执行。
原始实现,完成时没有回调。
const requests: Observable[] = getRequests();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
为了在所有请求完成时实现回调,我尝试了以下方法。
const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();
requests.forEach(obs => obs.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}));
forkJoin(requests).subscribe(() => {
finishedTracker.next();
finishedTracker.complete();
console.log('requests done');
});
这可行,但对我来说似乎很奇怪,我需要拆分请求的 forkJoin 和订阅。有没有更好的方法来实现这个功能?我也查看了 mergeMap,但无法使其正常工作。
编辑根据评论,我意识到订阅两次意味着发出两次请求。因此我尝试了另一种实现方式。
from(requests).pipe(
mergeMap(o => {
o.subscribe(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}
return o;
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})
我没有使用 forkJoin
的结果,因为据我所知,它给了我所有请求的结果。因此需要等待他们完成。由于每个请求都比较快,但通常有数百个请求,所以我需要在每个请求完成后立即将它们各自的结果传递给行为主体。
编辑 2 我采用的解决方案。
from(requests).pipe(
mergeMap(request => request, 10),
scan<ImageResponse, ImageResponse[]>((all, current, index) => {
all = all.concat(current);
this.$images.next(all);
return all;
}, [])
).subscribe({
complete: () => {
finishedTracker.next();
console.log('requests done');
}});
最佳答案
没有必要在您的 mergeMap
中订阅。事实上,正如其他人所指出的那样,它导致了双重订阅,因为 mergeMap
在内部订阅了您传递给它的函数返回的可观察值。
要在响应发生时对其进行处理,您只需使用管道并在其中添加处理逻辑即可。由于您本质上是在产生副作用(不会修改当前流的输出),因此使用 tap
运营商是合适的:
from(requests).pipe(
mergeMap(o => o.pipe(
tap(res => {
const currentImages = this.$images.value;
currentImages.push(res);
this.$images.next(currentImages);
}),
}, 10)
).subscribe(() => {
finishedTracker.next();
console.log('requests done');
})
虽然这会奏效,但看起来您将可观察的流程复杂化了。我不确定您的用例,但我猜根本不需要主题。如果您的目标是在处理结果时发出累积结果数组,您可以使用 scan
为此,不涉及任何 Subject
或 BehaviorSubject
。要在所有请求完成后执行一些逻辑,您可以传递部分 Observer
,它仅指定 complete
回调(而不是 next
回调,当您将函数作为参数传递给 subscribe()
):
from(requests).pipe(
mergeMap(request => request, 10),
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
});
编辑:正如@AdrianBrand 所指出的,使用 merge
更简洁而不是 from
/mergeMap
:
merge(...requests, 10).pipe(
scan((all, current) => all.concat(current), [])
).subscribe({
complete: () => console.log('requests done')
})
关于angular - 如何正确地 forkJoin 可观察对象列表或其订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75433820/
调用 forkJoin 时: forkJoin(observableA, observableB) .pipe(takeUntil(onDestroy$)) .pipe(takeUntil(onD
我有以下代码 const mySource:Observable[]> (...) mySource.pipe( switchMap(arr=>forkJoin(arr)); ) 按预期工作,但
TL; 博士 是否可以在 observable 的管道内使用 forkJoin? 完整故事: 我有一个返回对象数组的 Observable 的服务。对于这些对象中的每一个,我都需要再次调用返回一个可观
今天,我打开我的 Angular 项目,发现一个警告说: forkJoin is deprecated: resultSelector is deprecated, pipe to map inste
我正在尝试 Java ForkJoin 框架并编写了一个简单的测试程序,将图像的像素设置为随机颜色。例如。它会产生伪噪声。 但是在测试性能时,我发现运行单线程实际上比运行多线程更快。我通过通过一个高阈
我有一个方法。 private getData() { console.log('Begin'); const sub1 = this.http['url1'].get();
我有几种形式的方法 getFirstDataBunch() { return this.repository.getData(this.parameter).pipe(
我正在使用 forkJoin 合并两个 Firebase 请求的结果 两个请求都完成并记录在控制台中,但 forkJoin 本身的映射函数不会触发,因此没有结果返回到应用程序 public initG
stackblitz 在下面的代码中,this.form.valueChanges 和 this.form.get('name').valueChanges 发射正常。但是 forkJoin 这些都不
我不明白为什么我可以这样做 ForkJoin(service.Obs1(), service.Obs2()).subscribe 但是如果我重构我的代码,以便我向我的服务类添加一个返回 ForkJoi
我的应用程序将进行多个并行 http 调用。它基于 switch-fall though 逻辑,因此查询的数量会有所不同(1、2 或 3 次调用)。有没有办法让我知道哪个电话首先返回?即要么分配一个
我正在尝试使用 forkJoin 运算符来组合两个可观察值并输出一个新值。其中一个调用返回一组对象,其中包含与保单上的多个人相关的数据(该部分不能更改)。另一个 observable 告诉我们谁登录了
对于那里的 Angular 专业人士来说,这应该是一个简单的问题。我有这个简单的方法: private addMultiple(products: Product[]): void { let
我刚刚下载了 Reactive Extensions for .NET 的稳定版本,正在查看 101 Rx examples .并行执行示例使用 ForkJoin方法。这种方法不是稳定版本(或不稳定版
我正在尝试使用 forkJoin 来运行一个带有键和值的对象数组,该值是可观察的。我需要将它设置为对象数组将导致键相同且值是 observable api 调用的结果的位置。这个对象可以有任意数量的键
什么会导致单个 subscribe 触发,但 forkJoin 不触发? 我已经发送了一个触发 subject.subscribe 的请求,但是当我将其添加到与 forkJoin 一起使用的数组时,f
我尝试通过 rx-http-request 使用 RxJS 从 NodeJS 服务器查询第三方 API。从长远来看,我想使用 RxJS 来处理一些更有趣的错误情况。但目前我在运行一个相当微不足道的案例
我目前正在使用 combineLatest() 方法来获取我所有的 friend 并将其作为列表。但是,当我尝试从头开始删除某个项目时,它会产生排序问题。 ( View 无法正确渲染)因此,我想从 c
我有一个使用 Firebase Storage 发送一些图像数据的应用程序,因此我使用此方法: startUpload() { if (typeof this.fileList !== 'unde
我有一个非常大的有序可观察对象列表,需要并行运行。当每个可观察对象返回时,它们将结果附加到行为主题,这就是结果传递的方式。但是,我需要在它们全部完成时调用一个特定的函数。 每个 observables
我是一名优秀的程序员,十分优秀!