- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有 CompositeSubscription ,并在那里添加带有 ReplaySubject 的 Subscription
CompositeSubscription compositeSubscription = new CompositeSubscription();
ReplaySubject subject = ReplaySubject.create();
compositeSubscription.add(
manager.getAllContacts()
.toList()
.doOnNext(new Action1<List<Person>>() {
@Override
public void call(List<Person> persons) {
allPersons = persons;
Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size());
setupViewPager();
}
})
.subscribe(subject));
然后我用这个 ReplaySubject 添加第二个订阅
compositeSubscription.add(Observable.combineLatest(subject,
(PublishSubject<List<CustomUser>>) execute(
manager.getDigitsContacts()),
new Func2<List<Person>, List<CustomUser>, Object>() {
@Override
public Object call(List<Person> persons, List<CustomUser> customUsers) {
//... my code with persons and customUsers...
return null;
}
})
.subscribe());
代码正常运行,在完成 ReplaySubject hasCompleted = true 之后。
但是当我尝试添加第三个 Subscription 时,它不会调用“call()”方法
compositeSubscription.add(Observable.combineLatest(subject,
(PublishSubject<List<CustomUser>>) execute(
manager.getFacebookContacts()), //<-----manager.getFacebookContacts() is run, but doesn't call call() method
new Func2<List<Person>, List<CustomUser>, Object>() {
@Override
public Object call(List<Person> persons, List<CustomUser> customUsers) {
//...this method is not called after "manager.getFacebookContacts()"
return null;
}
})
.subscribeOn(Schedulers.newThread())
.subscribe());
如何解决?...因为如果我同时添加订阅,它就可以正常工作。
最佳答案
能否请您将错误回调添加到.subscribe()
?我的猜测是第三次 ReplaySubject
溢出了 combineLatest 的缓冲区。您应该使用 .replay().autoConnect(0)
CompositeSubscription compositeSubscription = new CompositeSubscription();
Observable<List<Person>> persons = manager.getAllContacts()
.toList()
.doOnNext(new Action1<List<Person>>() {
@Override
public void call(List<Person> persons) {
allPersons = persons;
Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size());
setupViewPager();
}
}).replay().autoConnect(0, s -> compositeSubscription.add(s));
然后使用persons
代替subject
关于java - 如何使用 ReplaySubject 重新运行 CompositeSubscription?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611142/
我有一个外部热源在观察者可以订阅之前推送值。订阅后,迟到的观察者应该收到最新的值以及从那时起的每个值。为此,我使用了以下代码(相关行标有“ console.warn('!!!', n)); 这行不通(
如何将多个错误传递给 ReplaySubject? 当我调用 OnError 时,只有第一个异常被传递。我需要多次调用并传递所有错误/异常。 我在内部看到 RX 创建了一个 AnonymousSafe
我有一个 ReplaySubject,它使用 scan 运算符累积数据,每 10000 毫秒应重置一次。还有其他方法吗? 现在: let subject = new ReplaySubject();
我正在使用 ReactiveX/RxJS 版本。 假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个包含 id 和带有值的数组的对象。我想减少这个值数组并得到它们的总和。 问题是 R
我有一个奇怪的用例,我需要跟踪所有以前发出的事件。 感谢 ReplaySubject,到目前为止它运行良好。在每个新订阅者上,此主题都会重新发出以前的每个事件。 现在,对于特定场景,我需要能够只提供最
下面是一个描述我正在尝试做的事情的片段。在我的应用程序中,我有一个贯穿始终的 replaysubject。在某个时候,我想获得主题发出的最后一个值,但是 last似乎不适用于 ReplaySubjec
我在 Angular 4 中使用的模板有问题。该模板实现了一个通知系统,您可以在其中添加新通知,但文档没有指定如何删除观察者 ReplaySubject 的元素。 模板将其实现为服务,如下所示: pr
我想用这样的加入: Observable.forkJoin( this.service1.dataSourceIsAReplaySubject, this.service2.dataS
我在我正在处理的 Silverlight 应用程序中遇到了一个绝对奇怪的行为。请看下面的代码: var replaySubject = new ReplaySubject(1); replay
根据我目前对 RxJS 中 ReplaySubject 的理解,下面的代码应该可以工作: import { ReplaySubject } from 'rxjs/ReplaySubject'; pri
我想创建一个冷可观察对象,它只会在有实际订阅时才开始执行昂贵的操作。 ReplaySubject 非常适合,除了我需要能够在实际订阅而不是创建可观察对象时启动昂贵的后台操作的部分。有办法吗?某种 on
我将状态保存在一个 ReplaySubject 中,它会重播状态的最后一个副本。从该状态,派生出其他 ReplaySubjects 来保持……好吧,派生状态。每个重放主题只需要保存它的最后计算状态/派
如何清除 ReplaySubject 上的缓冲区? 我需要定期清除缓冲区(在我的例子中作为一天结束的事件)以防止 ReplaySubject 不断增长并最终吃掉所有内存。 理想情况下,我希望保持相同的
我有一个 rxjs ReplaySubject,它会根据我所处的环境发出一个值或 null。意思是如果在某个环境中我进行服务调用并获取数据。如果我不在那个环境中,我只是在重播主题上调用 next(nu
我开发了一个具有两个 View 的组件。组件 A 有一个联系表单,组件 B 是“谢谢”页面。 组件A:您填写表格并提交。一旦响应到达,就会创建一个新的 ReplaySubject 值。用户将被路由到组
我需要一种方法来获取最近添加到符合特定条件的 ReplaySubject 的项目。下面的示例代码完成了我需要它做的事情,但感觉不是正确的方法: static void Main(string[] ar
我一直在尝试让我的流仅重播最新的值,但运气不佳。基本上,我有一个 replaySubject(1) 在我不能手动 .onNext 的地方。我想做的是接受它,将它映射到其他东西,然后添加一个初始值。 我
我有 CompositeSubscription ,并在那里添加带有 ReplaySubject 的 Subscription CompositeSubscription compositeSubs
我在 RxSwift 中有这个: func foo() -> Observable { let subject = RxSwift.ReplaySubject.create(bufferSiz
标题 如果您基本上有相同的问题并且您的上下文是 Angular,您可能需要阅读答案中的所有评论以获得更多上下文。 这个问题的简短版本 在做 let observe$ = someReplaySubje
我是一名优秀的程序员,十分优秀!