gpt4 book ai didi

rxjs - 如何在 RXJS 中暂停来自一个源的缓冲区?

转载 作者:行者123 更新时间:2023-12-05 04:45:23 28 4
gpt4 key购买 nike

我有一系列事件通过 fromEventPattern 进行像这样:

fromEventPattern<IPsEvent>(addEventHandler).subscribe(ps$);

由于业务怪癖,我预计有时会抛出异常,此时我想对事件进行排队,并在错误状态得到解决后重新触发。

我一直在尝试来自 Pausable buffer with RxJS 的解决方案无济于事。我认为这是因为他们能够通过一个单独的可观察对象进行切换,而这有点要求在中途暂停。在链接示例中,我有 blockingCallsAllowed$而不是 autoSave$ .这是我最近的尝试:

const source$ = new Subject<IPsEvent>();

const blockingCallsAllowed$ = new BehaviorSubject(true);
const on$ = blockingCallsAllowed$.pipe(filter((v) => v));
const off$ = blockingCallsAllowed$.pipe(filter((v) => !v));

source$
.pipe(
map(() => {
try {
// line will throw exception at certain times
myFunction();
return true;
} catch (e) {
const i = setInterval(() => {
try {
myFunction();
console.log('good again');
blockingCallsAllowed$.next(true);
clearInterval(i);
} catch (er) {
// still in flux
}
}, 50);
return false;
}
}),
)
.subscribe(blockingCallsAllowed$);

const output$ = merge(
source$.pipe(bufferToggle(off$, () => on$)),
source$.pipe(windowToggle(on$, () => off$)),
).pipe(concatMap(from));

output$.subscribe((evt) => {
console.log('After buffers', evt);
});

// Add events from the Ps API to the event stream
fromEventPattern(addEventHandler).subscribe(source$);

一切都很好,直到第一个异常,然后它永远不会输出它缓冲的内容,即使它在 console.log 中再次触发一切正常.

我认为依赖 source$.pipe 存在一些时间问题在同一次执行中,然后间隔稍后运行 .next .在此代码的许多不同排列之后,无法确定它。

最佳答案

我不清楚你要实现什么。尽管如果您想每 50 毫秒重试一次 myFunction() 直到它成功并在这种情况发生时停止处理其他事件,concatMap 基本上会为您完成所有这些。

它将在等待内部可观察对象完成时缓冲来自源的排放。

所以你想要的可能是这样的:

source$.pipe(
concatMap(_ => of(true).pipe(
tap(_ => myFunction()),
retryWhen(errors => errors.pipe(
delay(50)
))
))
).subscribe();

关于rxjs - 如何在 RXJS 中暂停来自一个源的缓冲区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69167198/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com