gpt4 book ai didi

rxjs - 当另一个流发出时如何动态更改流间隔

转载 作者:行者123 更新时间:2023-12-05 00:44:42 25 4
gpt4 key购买 nike

所以我有一个由字符串数组组成的流。

const msgs = ['Searching...', 'Gathering the Data...', 'Loading the Dashboard...'];
const msgs$ = Observable.from(msgs);

我需要每 3 秒按顺序发出这些消息之一,直到 dataFetched$流发出 - 这基本上意味着所有数据都到达页面。我无法在它们之间循环。一旦看到最后一条消息但数据尚未到达,则不应更改。

我可以查看 msgs 中的每个字符串随着时间的推移 RxJS zip功能
const longInt$: Observable<number> = Observable.interval(3000);
const longMsgsOverTime$ = Observable.zip(msgs$, longInt$, msg => msg);

然后当 dataFetched$流发出我需要切换到 shortInt$这是
    const longInt$: Observable<number> = Observable.interval(1500);

并继续显示加载消息,直到最后一个可见。每条消息只能看到一次。

这对我来说很费神——我可以勾选一些要求,但不能让它完全工作。
经过多次尝试,我得出结论,我们需要包装 msgs在我们从 longInt$ 切换后,在一个 Subject 中的数组以防止再次循环遍历所有这些。至 shortInt$ .

====================编辑======================
以下是 ggradnig 的答案和代码,我炮制了这个丑陋的东西(用于调试目的):
    setLoaderMsg(){
console.log('%c setting loader msg', 'border: 10px dotted red;');
const msgs$ = Observable.from(['Searching...', 'Gathering the Data...', 'Loading the Dashboard...', 'Something something...', 'yet another msg...', 'stop showing me...']),
shortInt$ = Observable.interval(250).pipe(
tap(v => console.log('%c short v', 'background: green;',v))
),
longInt$ = Observable.interval(3000).pipe(
tap(v => console.log('%c long v', 'background: red;',v)),
takeUntil(this.dataFetched$)
),
// interval$ = Observable.interval(3000).pipe(takeUntil(this.dataFetched$)).pipe(concat(Observable.interval(250))),
interval$ = longInt$.pipe(concat(shortInt$));

Observable.zip(msgs$, interval$, msg => msg)
// Observable.zip(msgs$, interval$)
.pipe(tap(v => {
console.log('%c v', 'background: black; border: 1px solid red; color: white;', v);
this.loaderMsg = v;
}))
.subscribe(
() => {}, //next
() => {}, //error
// () => this.loading = false //complete
() => {
console.log('%c complete', 'background: yellow; border: 2px solid red');
// this.loading = false;
}
);
}

下面是我的 chrome 控制台的屏幕截图,看看会发生什么。 shortInt$流将记录绿色消息,正如您所看到的那样,即使 dataFetched$ 也不会发生。流发射(橙色虚线边框)。
在“获取数据发出”日志下方,我们应该看到绿色背景消息,表示 shortInt$ 流正在发出值。

enter image description here

================================ 第二次编辑 ============== ==========

下面是 dataFetched$可观察:
fetchData(){
console.log('%c fetching now', 'border: 2px dashed purple');

// this.initLoader();

this.dataFetched$ = Observable.combineLatest([
this.heroTableService.getHeroTableData([SubAggs.allNetworks.key], AggFieldsMap.legends.name),
this.researchService.benchmark(this.getSubAggsForOverviewTbl()),
this.multiLineChartService.getEngagementOverTime(this.getSubAggsForNetworkEngagementsOverTimeTable())
]).pipe(
share(),
delay(7000),
tap(v => {
console.log('%c fetch data emit', 'border: 2px dashed orange', v);
})
);


const sub = this.dataFetched$.subscribe(([heroTableRes, benchmarkRes, netByEngRes]: [ITable[], IBenchmarkResponse, any]) => {
// this.loading = false; ==> moved to dashboard

//xavtodo: split this shit into .do() or .tap under each http request call
//hero table logic here
this.heroTableData = heroTableRes;

//////////////////////////////////////////////////////
//engagement-by-network-table
this.engagementByNetworkData = this.researchService.extractDataForEngagementByNetworkTable(benchmarkRes, this.getSubAggsForOverviewTbl());

//////////////////////////////////////////////////////
//network eng over time logic here MULTI LINE CHART
this.engagementMultiLineData = this.multiLineChartService.extractEngagementData(netByEngRes, this.getSubAggsForNetworkEngagementsOverTimeTable());
this.networks = this.multiLineChartService.getNetworks();
this.multilineChartEngagements = Util.getNetworksWithAltNames(this.networks);
this.publishedContentData = this.multiLineChartService.extractPublishedData(netByEngRes);

//combined multiline chart
this.multiLineChartData = {
publishedData: this.publishedContentData,
engagementData: this.engagementMultiLineData
};
});

this.addSubscription(sub);
}

最佳答案

听起来像是 takeUntil 的任务和 concat .

第一个可观察值是 longInt$ .尽快完成 dataFetched$发出。对于此要求,您可以使用 takeUntil .它接受另一个 observable 作为参数,一旦该 observable 发出,源 observable 将完成。

从文档:

Emits the values emitted by the source Observable until a notifier Observable emits a value.



所以,第一步是:
const longInt$: Observable<number> = Observable.interval(3000)
.pipe(takeUntil(dataFetched$));

接下来,您要 concat你的另一个可观察对象, shortInt$ . Concat 会让第一个 observable 完成,然后让第二个 observable 接管。

从文档:

Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.



这看起来像这样:
const longMsgsOverTime$ = Observable.zip(msgs$, 
longInt$.pipe(concat(shortInt$)), msg => msg);

注:如果您使用的是 RxJS 5 或更低版本,请将 pipe(..) 语句替换为相应的运算符。例如, .pipe(concat(...))变成 .concat(...)
这是 StackBlitz 上的一个例子: https://stackblitz.com/edit/angular-mtyfxd

关于rxjs - 当另一个流发出时如何动态更改流间隔,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52188423/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com