gpt4 book ai didi

angular - RxJS - 分享由 'expand' 产生的无限流

转载 作者:行者123 更新时间:2023-12-05 05:48:12 24 4
gpt4 key购买 nike

我有一个位于网络服务中的分页第三方资源。我想做的是将分页资源转换为值流,并让客户决定使用多少元素。也就是说,客户端应该不知道原始资源已分页。

到目前为止,我得到了以下代码:

import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';

interface Result {
page: number;
items: number[];
}

// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
console.log('Fetching page ' + page);
return of({ page, items });
}

// Turn a paginated request into an infinite stream of 'number'
function mkInfiniteStream(): Observable<number> {
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items))
);
}

const infinite$ = mkInfiniteStream();

这非常有效:我得到了一个懒惰的无限数字流,客户端只需执行 infinite$.pipe(take(n)) 并取第一个 n 元素,而不知道底层资源已分页。

现在,我想做的是在处理多个订阅者时共享这些值,即:

infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));

// Assume that later we have new subscribers
setTimeout(() => {
infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);

setTimeout(() => {
infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);

如您所见,我们将有多个订阅者订阅无限流,我想重用已经发出的值以减少fetchPage的数量电话。在此示例中,一旦客户端请求 10 个项目 (take(10)),那么任何请求少于 10 个项目(例如 5 个项目)的客户端都不会调用 fetchPage 因为那些项目已经发出


我尝试了以下方法,但无法获得所需的行为:

const infinite$ = mkInfiniteStream().pipe(share())

不起作用,因为迟到的订阅者会导致多次调用“fetchPage”。

const infinite$ = mkInfiniteStream().pipe(shareReplay())

强制流中的所有值,即使不需要它们(还没有客户要求所有项目)


如有任何提示,我们将不胜感激。如果有人想尝试代码:https://stackblitz.com/edit/n4ywfw

最佳答案

看起来我们需要找到一种方法来存储一些状态,例如最后读取的页码和获取的项目,并确保在新订阅到达时重播此状态。

一种可能是以这种方式处理闭包。

首先,您创建一个 infiniteStreamFactoryGenerator 函数,它返回一个函数,该函数通过闭包保存一些状态,特别是 lastPageitemsRead。此类状态在 infiniteStreamFactoryGenerator 函数返回的函数内更新。

function infiniteStreamFactoryGenerator() {
let lastPage = 0;
let itemsRead = [];
return () => {
// if there are items read we return them first and then we start
// the infinite stream
return concat(
from(itemsRead),
fetchPage(lastPage).pipe(
expand((res) => {
return res.page < 10 ? fetchPage(res.page + 1) : EMPTY;
}),
tap((res) => {
// here we update the state
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
);
};
}

然后我们调用infiniteStreamFactoryGenerator来创建真正的工厂函数,并使用这样的工厂函数来实例化各种流,就像这样

const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();

infiniteStreamFactory$()
.pipe(take(10))
.subscribe((v) => console.log("[1] got ", v));

// Assume that later we have new subscribers
setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(5))
.subscribe((v) => console.log("[2] got ", v));
}, 1000);

setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(4))
.subscribe((v) => console.log("[3] got ", v));
}, 1500);

setTimeout(() => {
infiniteStreamFactory$()
.pipe(take(20))
.subscribe((v) => console.log("[4] got ", v));
}, 2000);

如您所见,我还添加了第四个订阅者,它需要加载其他页面。

根据 this stackblitz这个解决方案似乎有效。

老实说,必须构建一个工厂生成器函数的想法让我想到可能有更简单的方法来解决这个有趣的问题,但我还没有找到。

关于angular - RxJS - 分享由 'expand' 产生的无限流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70858380/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com