作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是使用 RxJava 版本 0.19.6。
在 groupBy 操作之外,可以创建一个由以下代码描述的管道,例如,根据某些条件从 Observable 中选择一条记录,或者选择满足某些替代条件的第一条记录:
Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
Observable<Long> filter1 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
BlockingObservable.from(Observable.concat(filter1, filter2).first()).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
...不幸的是,由于对 GroupedObservable 的限制,同类过程似乎无法在分组上下文中运行:
BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong % 5;
}
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
@Override
public Observable<Long> call(GroupedObservable<Long, Long> in) {
Observable<Long> filter1 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
return Observable.concat(filter1, filter2).first();
}
})).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
...导致多订阅者异常(线程“主”java.lang.IllegalStateException 中的异常:只允许一个订阅者!)。
我是否遗漏了一些明显的解决方法?在这种情况下,我曾尝试使用 ConnectableObservables 来呈现单个订阅者的外观,但这些尝试也都失败了(肯定是由于我的无知)。
在相关说明中,groupByUntil 似乎也为您提供了对 GroupedObservable 的引用,如果我实际上尝试使用它来确定何时关闭窗口,这让我很头疼地提示多个订阅者。在这里,我再次确信我忽略了一些明显的东西,因为 API 显然希望人们使用 GroupedObservable!
最佳答案
您可以在 GroupedObservable 上使用 .cache()。
final Observable<Long> inCached = in.cache();
然后在您的过滤器中使用生成的 Observable。
Observable<Long> filter1 = inCached.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
这样每个订阅者都会看到相同的项目,但 GroupedObservable 只会有一个订阅者。
关于java - 如何在 RxJava 中让多个订阅者订阅一个分组的 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25374403/
我是一名优秀的程序员,十分优秀!