gpt4 book ai didi

angular - RxJs难题:共享流并在有新订阅时有条件地重试

转载 作者:行者123 更新时间:2023-12-04 23:13:47 31 4
gpt4 key购买 nike

内容:

我们的Web应用程序可以同时显示不同的帮助面板。
当面板要求给定ID(例如help-id-1)时,我们将到达一个API,并传递该ID,然后获得所需的帮助。

现在,由于某些原因,我们可能会在同一帮助面板上显示两次或更多次。但是,我们当然不希望为同一项目多次击中该API,如果没有错误或当前正在获取该API。

我们的“生产者”为我们提供了检索它的冷流:

const getHelpContentById = (id: string) => fromPromise(
httpCallToGetHelpResultFromThirdLib(id)
).pipe(
catchError(error => of({ status: 'ERROR', item: null, id })),
// extracting the body of the response
map(getHelpItemFromResponse),
// wrapping the response into an object { status: 'SUCCES', item, id }
map(item => setStatusOnHelpItem(item, id)),
startWith({ status: 'LOADING', item: null, id }),
)


我们从包含状态的对象开始流,然后再接收另一个具有新状态的对象,该对象是 SUCCESSERROR

预期的解决方案应:
-在首次调用给定ID时从API中提取
-如果在上一个订阅完成之前,另一个订阅发生了相同的ID(状态 LOADING),则该订阅应获得与第一次调用相同的流,而无需再次获取API
-如果应用的一部分显示 help-id-1失败,则不应关闭该流,而应关闭其中的 next类型为 { status: 'ERROR', item: null, id }的值,这样,如果另一个组件尝试再次显示 help-id-1,由于该ID的最后状态为 ERROR,因此它应尝试再次访问API,并且两个订阅者都应收到实时更新 { status: 'LOADING', item: null, id },然后返回错误或成功

第一次尝试:
这是我想到的第一种非RxJs方法:
(从服务中提取的代码,这是一个类)

private helpItems: Map<
string,
{ triggerNewFetchForItem: () => void; obs: Observable<HelpItemWithStatus> }
> = new Map();

private getFromCacheOrFetchHelpItem(id: string): Observable<HelpItemWithStatus> {
let triggerNewFetchForItem$: BehaviorSubject<HelpItemWithStatus>;
const idNotInCache = !this.helpItems.has(id);

if (idNotInCache) {
triggerNewFetchForItem$ = new BehaviorSubject<HelpItemWithStatus>(null);

this.helpItems.set(id, {
triggerNewFetchForItem: () => triggerNewFetchForItem$.next(null),
obs: triggerNewFetchForItem$.pipe(
switchMap(() => getHelpContentById(id)),
shareReplay(1),
),
});

return this.helpItems.get(id).obs;
} else {
return this.helpItems.get(id).obs.pipe(
tap(item => {
if (item.status === ContentItemStatus.ERROR) {
this.helpItems.get(id).triggerNewFetchForItem();
}
})
);
}
}

public getHelpItemById(id: string): Observable<HelpItemWithStatus> {
return this.getFromCacheOrFetchHelpItem(id);
}


我同事的尝试:

private getFromCacheOrFetchHelpItem4(id: string): Observable<HelpItemWithStatus> {
let item = this.items.get(id);

if (item && item.status !== ContentItemStatus.ERROR) {
return of(item);
}

return getNewWrappedHelpItem(this.contentfulClient, id).pipe(
tap(item => this.items.set(id, bs),
shareReplay(1)),
)
}


问题
-如果您订阅了一次,但最终报错
-您从新组件再次订阅
-按预期执行另一次获取,但更改了引用
-API调用成功,第二个组件已更新,第一个未更新

结论:
我敢肯定,有一种更好的Rx方式可以做到这一点,即使不依赖“外部缓存”(此处为 Map)。最好的办法显然是为此配备一个新的运算符:)

最佳答案

我基本上在工作相同的场景。我已经考虑了一段时间,但最终决定花点时间。我的解决方案有些粗糙,但是我会在优化时尝试更新答案。我希望能有更好的方法来提供反馈。

问题/步骤

这是一个相当复杂的问题,所以我将逐步进行构建。为了简单起见,我们将从不带参数的api方法开始。

共享流,以便多个订阅者不会触发多个api请求

只需在最后添加share()运算符即可使其成为多播。

return api().pipe(share());


共享API的最新响应

只需将 share()更改为 shareReplay(1)。该参数指示要共享的先前响应的数量。我们只希望发出最后一个,所以我们放入 1

或者,您可以使用 tap运算符保留对最后一个发出的值的引用,并执行 of(data)而不是如果最后一个成功,则不返回流。仅当您不再希望再次调用api时才适用(如所讨论的OP),但我将其保持通用性以使其灵活地适用于其他解决方案。

return api().pipe(shareReplay(1));


如果最后一个失败,则允许新订阅触发新的api请求

这太笨了。很容易获得最后的值,甚至很容易为新订户重新运行流。但这并不能使以前的订户受益。您可能会获得成功的结果,但是不会通知任何先前的订阅者。本质上,您要的是让您的主题在有新订阅时发出新值。据我所知,这是不可能的。

我的工作是设置自己的主题,每当有人请求该信息流时,我就可以触发该主题。这不是一回事,但我认为这确实是我们想要的。我们真正想要的是某种重试方法。如果不是通过使用 retryWhen运算符来实现自动化,则我们需要某种手动方式,例如加载新组件。加载新组件时,它们会请求流,因此可以正常工作。

因此,我们创建一个主题,并在超时后调用下一步。我宁愿使用 ReplaySubjectBehaviorSubject来避免超时,但这样做时( ExpressionChangedAfterItHasBeenCheckedError)遇到了角度变化检测的问题。我需要更深入地研究它。

请注意, share在外部流上。我们想分享它而不是内在的。另请注意,由于我们每次都需要一个新的内部流,因此我使用的是 switchMap而不是 switchMapTo

const trigger = new Subject<void>();
setTimeout(() => trigger.next());
return trigger.pipe(
switchMap(() => api()),
shareReplay(1)
);


发生错误后让流保持活动状态

catchError运算符可让您返回可观察值。由于我们希望将其作为消息,因此我们只执行 catchError(e => of(e))。问题在于这将结束流。解决方法是将捕获的内容放入 switchMap的内部,以便内部流可以死掉而外部流可以继续前进。

return trigger.pipe(
switchMap(() => api().pipe(
catchError(err => of(err))
),
shareReplay(1)
);


知道api的状态

为此,我们将创建一个具有type属性的通用响应包装。可能的值为“ FETCHING”,“ SUCCESS”和“ FAILURE”。 api调用开始后,我们将使用 startWith运算符发送获取通知(因此为什么会结束)。

return trigger.pipe(
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' })
),
shareReplay(1)
);


一次仅允许一个机上请求

基本上,如果请求正在进行中,我们不希望不调用触发器。我们可以使用标志来执行此操作,也可以将 distinct运算符与触发器配合使用以在api调用解析后将其重置。第二种方法很棘手,因为在构造流时需要引用该流。因此,我们将只使用一个变量,并且可以将 trigger.next()包装在if中,也可以在流上放置一个过滤器。我将做一个过滤器。

private state: string;
...
return trigger.pipe(
filter(() => this.state !== 'FETCHING'),
switchMap(() => api().pipe(
map((data) => ({ state: 'SUCCESS', data })),
catchError(err => of({ state: 'FAILURE', err })),
startWith({ state: 'FETCHING' }),
tap(x => { this.state = x.state; })
),
shareReplay(1)
);


仅重试错误,不成功

为此,您要做的就是不调用触发器。因此,只需将触发器上的条件更改为未初始化状态或“失败”即可。

...
filter(() => this.state == null || this.state === 'FAILURE'),
...


共享相同参数的流

基本上,您只需要散列参数并将其用作映射的键​​即可。请参阅下面的完整示例。



这就是全部。我创建了一个辅助函数,该函数将生成一个api方法。开发人员必须为参数提供api方法和哈希方法,因为推断起来会过于复杂。

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, switchMap, map, startWith, catchError, shareReplay, filter } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
params: any[],
data: T
}

// information related to a stream for a unique set of parameters
interface StreamConfig<T> {
state: ApiStateType;
trigger: Subject<void>;
stream: Observable<ApiStatus<T>>;
}

export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const cache = new Map<string, StreamConfig<T>>();

return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
let config = cache.get(key);

if (!config) {
console.log(`created new stream (${key})`);
config = <StreamConfig<T>> { trigger: new Subject<void>() };
config.stream = config.trigger.pipe(
filter(() => config.state == null || config.state === ApiStateType.Failure),
switchMap(() => {
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, params }),
tap(x => { config.state = x.state; })
);
}),
tap(x => { console.log('PUBLISH', x)}),
shareReplay(1),
);
cache.set(key, config);
} else {
console.log(`returned existing stream (${key})`);
}
setTimeout(() => { config.trigger.next() });
return config.stream;
}
}


这是我一起破解的一个运行示例: https://stackblitz.com/edit/api-cache


我敢肯定,有一种更好的Rx方式可以做到这一点,甚至无需依靠“外部缓存”(此处的Map)。最好的办法显然是为此配备一个新的运算符:)


我创建了一个 cacheMap运算符来尝试做到这一点。我有一个发出api参数的源, cacheMap运算符将为唯一的一组参数查找或创建流,并返回其 mergeMap样式。问题在于,现在每个订阅者都将订阅该内部可观察对象。因此,您必须添加一个过滤器(请参见下面的替代解决方案)。

替代解决方案

这是我想到的另一种解决方案。代替维护多个流,您可以拥有一个主要流,并使用过滤器将其提供给订阅者。

单个流的问题是重播将应用于所有参数。因此,您要么必须使用没有缓冲区的重放,要么自己管理重放。

如果使用不带缓冲区的重播,则它将重播 FETCHINGSUCCESS的所有内容,这可能会导致额外的处理,尽管用户可能不会注意到。理想情况下,我们将有一个 replayByKey运算符,但我还没有写出来。因此,现在我仅使用地图。使用地图的问题在于,我们仍然向已经收到地图的订户发送相同的值。因此,我们将 distinctUntilChanged运算符添加到实例流。或者,您可以创建实例流,然后在其上放一个 takeUntil,触发器是为成功进行过滤的实例流,并放一个 delay(0)以允许最后一个值在关闭之前通过管道。这样就可以完成数据流,因为一旦成功就永远不会获得新的值,就可以了。我采用了与众不同的方法,因为如果您想更改其要求,它可以使您获得新的价值。

我们使用 mergeMap而不是 switchMap,因为我们可以同时进行针对不同参数的运行中请求,并且我们不想取消针对不同参数的请求。

解决方法如下:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, mergeMap, map, startWith, catchError, share, filter, distinctUntilChanged } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
Fetching,
Success,
Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
state: ApiStateType;
key: string;
params: any[];
data: T;
}

export function generateCachedApi<T>(
api: (...params) => Observable<T>,
generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
const trigger = new Subject<any[]>();
const stateCache = new Map<string, ApiStatus<T>>();
const stream = trigger.pipe(
map<any[], [any[], string]>((params) => [ params, generateKey(...params) ]),
tap(([_, key]) => {
if (!stateCache.has(key)) {
stateCache.set(key, <ApiStatus<T>> {})
}
}),
mergeMap(([params, key]) => {
const apiStatus = stateCache.get(key);
if (apiStatus.state === ApiStateType.Fetching || apiStatus.state === ApiStateType.Success) {
return of(apiStatus);
}
return api(...params).pipe(
map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, key, params, data })),
catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, key, params, data })),
startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, key, params }),
tap(state => { stateCache.set(key, state); })
)
}),
tap(x => { console.log('PUBLISH', x)}),
share()
);

return (...params): Observable<ApiStatus<T>> => {
const key = generateKey(...params);
const instanceStream = stream.pipe(
filter((response) => response.key === key),
distinctUntilChanged()
);
setTimeout(() => { trigger.next(params) });
return instanceStream;
}
}

关于angular - RxJs难题:共享流并在有新订阅时有条件地重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49069193/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com