- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在像这样创建我的可观察源(每 5 秒调用一次 api):const obs$ = Observable.interval(5000).switchMap(() => makeApiCall());
我想修改$obs
使其具有以下特点:
obs$.subscribe(...)
两次,底层代码makeApiCall()
应该只运行一次。 makeApiCall()
错误,我希望(如果可能)所有订阅者都收到错误通知,但重新连接到 $obs
, 并继续做 makeApiCall()
每5秒myBehaviorSubject
, 做一次订阅
obs$.subscribe(myBehaviorSubject)
, 任何其他观察者都应该订阅
myBehaviorSubject
.不确定这是否回答了“可重试”部分。
$obs.shareReplay(1)
会做的伎俩(4个要求)。如果我理解正确,它会将 ReplaySubject(1) 订阅到源 observable,并且 future 的观察者订阅此 ReplaySubject。是否有等效的 shareBehavior?
最佳答案
正如你提到的,shareReplay(1)
几乎可以让你到达那里。它将向当前订阅者多播响应并将最后一个值(如果有)重播给新订阅者。这似乎是您想要的,而不是 shareBehavior
(如果存在)因为您正在调用 api 并且没有初始值。
你应该知道shareReplay
将创建对源流的订阅,但仅在 refCount === 0
时取消订阅并且源流终止(错误或完成)。这意味着在第一次订阅之后,间隔将开始,即使没有更多订阅,它也会继续。
如果您想在无人订阅时停止间隔,请使用 multicast(new ReplaySubject(1)).refCount()
.多播运算符(operator)将创建对源流的单个订阅,并将所有值推送到作为实例 (multicast(new Subject())
) 或由工厂 (multicast(() => new Subject())
) 提供的主题中。多播后流的所有订阅者都将订阅多播主题。因此,当一个值流经多播运营商时,其所有订阅者都将获得该值。您可以更改传递给多播的主题类型以更改其行为。在您的情况下,您可能需要 ReplaySubject
以便它将最后一个值重播给新订阅者。您可以使用 BehaviorSubject
如果你觉得这满足你的需要。
现在multicast
运算符是 connectable
这意味着您必须调用 connect()
在流上使它变热。 refCount
operator 基本上使可连接的 observable 行为类似于普通的 observable,因为它在订阅时会变热,但在没有订阅者时会变冷。这样做是为了保持一个内部引用计数(因此得名 refCount
)。当refCount === 0
它会断开连接。
这与 shareReplay(1)
相同。有一个微小但重要的区别是,当没有更多订阅者时,它将取消订阅源流。如果您在订阅源时使用工厂方法创建新主题(例如:multicast(() => new ReplaySubject(1))
),那么当流从热到冷再到热时,您将失去值(value),因为每次它都会创建一个新主题变热。如果您想在源订阅之间保持相同的主题,那么您可以传入主题而不是工厂(例如:multicast(new ReplaySubject(1))
或使用其别名 publishReplay(1)
。
至于您向订阅者提供错误然后重新订阅的最后要求,您不能调用 error
订阅回调,然后继续获取 next
上的值打回来。如果未处理的错误到达订阅,它将终止订阅。因此,如果您希望您的订阅看到它并且仍然存在,您必须在它到达之前将其捕获并将其转换为普通消息。你可以这样做:catch((err) => of(err))
并以某种方式标记它。如果你想静音然后返回 empty()
.
如果您想立即重试,可以使用 retryWhen
运算符,但您可能希望将其放在共享运算符之前以使其通用。但是,这也可以防止您的订阅者知道错误。由于您的流的根是一个间隔,并且错误来自 switchMap
返回的内部可观察对象,该错误不会终止流的源,但可能会终止订阅。因此,只要您处理错误 (catch/catchError
),就会在下一个间隔重试 api 调用。
此外,您可能需要 timer(0, 5000)
而不是间隔,以便您的 api 调用立即触发,然后以 5 秒的间隔触发。
所以我建议如下:
let count = 0;
function makeApiCall() {
return Rx.Observable.of(count++).delay(1000);
}
const obs$ = Rx.Observable.timer(0, 5000)
.switchMap(() => makeApiCall().catch(() => Rx.Observable.empty()))
.publishReplay(1)
.refCount();
console.log('1 subscribe');
let firstSub = obs$.subscribe((x) => { console.log('1', x); });
let secondSub;
let thirdSub;
setTimeout(() => {
console.log('2 subscribe');
secondSub = obs$.subscribe((x) => { console.log('2', x); });
}, 7500);
setTimeout(() => {
console.log('1 unsubscribe');
firstSub.unsubscribe();
console.log('2 unsubscribe');
secondSub.unsubscribe();
}, 12000);
setTimeout(() => {
console.log('3 subscribe');
thirdSub = obs$.subscribe((x) => { console.log('3', x); });
}, 17000);
setTimeout(() => {
console.log('3 unsubscribe');
thirdSub.unsubscribe();
}, 30000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>
publish() === multicast(new Subject())
publishReplay(#) === multicast(new ReplaySubject(#))
publishBehavior(value) === multicast(new BehaviorSubject(value))
关于rxjs - shareReplayLatestWhileConnected with RxJS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49903028/
RxJS 中是否有一个运算符可以让我缓冲项目并在信号可观察对象触发时将它们一个一个地放出?有点像bufferWhen,但不是在每个信号上转储整个缓冲区,而是每个信号转储一定数量。它甚至可以转储信号 o
我正在像这样创建我的可观察源(每 5 秒调用一次 api): const obs$ = Observable.interval(5000).switchMap(() => makeApiCall())
我有一个 Action ,然后将触发一个ajax请求。 如果由于某种原因操作失败,我什么也不想做。我没有执行可以执行的无操作功能,而不是创建只返回先前状态的空白操作? export default f
在以下代码中:- RxJS.Observable.of(1,2).first().subscribe((x) => console.log(x);); 给定运营商 first() 是否有必要取消订阅?
我有一种情况,可以在很短的时间内将很多事件发送到流中。我想要一个运算符,它是ojit_code和debounceTime的混合体。 以下演示可用于说明我想拥有的https://stackblitz
我的用例如下:我得到事件,有时会突然发生。如果发生突发,我只需要处理一次即可。去抖动会执行此操作。 但是,去抖动仅给我提供连拍的最后一个元素,但我需要了解连拍中的所有元素才能汇总(使用平面图)。 这可
简化以下代码示例的方法是什么? 我找不到合适的运算符..有人可以举一个简短的例子吗? this.returnsObservable1(...) .subscribe( success =>
在RxJS 6中,如何导入静态合并功能以合并Observable列表? 我希望能够做到: const merged$ = merge( obs1$, obs2$, obs3$
我正在阅读 RxJS 的官方文档,然后我意识到它们都在做完全相同的事情。 对我来说,它们看起来完全相似。 如果有区别请指出。 最佳答案 我将根据它们的 Time 版本来描述它们之间的区别,因为这是我最
我对基本的 RxJS 概念有点熟悉,比如 Observables、Observers 和 Subjects,但是 RxJS Notifications概念对我来说是全新的。 它有什么用?我应该什么时候
从 rxjs 6.5 切换到 rxjs 7 后,我遇到了这个奇怪的错误。我不确定这是 rxjs 7 的类型问题还是 stackblitz ( https://stackblitz.com/edit/r
以前我只能使用此代码导入使用过的运算符: import 'rxjs/Observable'; import 'rxjs/add/operator/map'; import 'rxjs/add/oper
combineLatest 函数可以从 rxjs 和 rxjs/operators 导入。 当我从 rxjs/operators 导入它时(就像我导入 combineAll 我收到以下错误: TS23
我有一系列事件通过 fromEventPattern 进行像这样: fromEventPattern(addEventHandler).subscribe(ps$); 由于业务怪癖,我预计有时会抛出异
我是 rxjs 的新手,无法解决这个问题: 我有两个流: 一个有传入的对象 ---a----b----c----d-----> 一个是从列表中选择的对象 ----------------c---->
如果一个 observable 完成,我是否仍然需要取消订阅/处置(在 RxJS 中)该 observable 以删除 Observer(防止内存泄漏),或者一旦 onComplete 或 onErr
我有这样的订阅: this.test.subscribe(params => { ...some code }); 如果我传递回调函数而不是箭头函数,则缺少上下文。 我想将上下文绑定(bind)到
我有一个可观察的: messages: string[] = ['a', 'b', 'c']; const source = from(messages) 你如何延迟它,所以当有人订阅它时,它
我可以让 observable 触发一次该值。但我希望它在变量的值发生变化时发生。实际上我需要一个观察者。这就是我认为 observable 的意义所在。观察事物的值(value)或状态并更新订阅它的
我有以下代码: const fetchBook = (bookId: number) => { const title = 'Book' + bookId; console.log('
我是一名优秀的程序员,十分优秀!