gpt4 book ai didi

java - 如何在 RxJava 中让多个订阅者订阅一个分组的 Observable?

转载 作者:行者123 更新时间:2023-11-29 09:29:40 25 4
gpt4 key购买 nike

这是使用 RxJava 版本 0.19.6

在 groupBy 操作之外,可以创建一个由以下代码描述的管道,例如,根据某些条件从 Observable 中选择一条记录,或者选择满足某些替代条件的第一条记录:

Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
Observable<Long> filter1 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
BlockingObservable.from(Observable.concat(filter1, filter2).first()).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});

...不幸的是,由于对 GroupedObservable 的限制,同类过程似乎无法在分组上下文中运行:

BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong % 5;
}
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
@Override
public Observable<Long> call(GroupedObservable<Long, Long> in) {
Observable<Long> filter1 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
return Observable.concat(filter1, filter2).first();
}
})).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});

...导致多订阅者异常(线程“主”java.lang.IllegalStateException 中的异常:只允许一个订阅者!)。

我是否遗漏了一些明显的解决方法?在这种情况下,我曾尝试使用 ConnectableObservables 来呈现单个订阅者的外观,但这些尝试也都失败了(肯定是由于我的无知)。


在相关说明中,groupByUntil 似乎也为您提供了对 GroupedObservable 的引用,如果我实际上尝试使用它来确定何时关闭窗口,这让我很头疼地提示多个订阅者。在这里,我再次确信我忽略了一些明显的东西,因为 API 显然希望人们使用 GroupedObservable!

最佳答案

您可以在 GroupedObservable 上使用 .cache()。

final Observable<Long> inCached = in.cache();

然后在您的过滤器中使用生成的 Observable。

Observable<Long> filter1 = inCached.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});

这样每个订阅者都会看到相同的项目,但 GroupedObservable 只会有一个订阅者。

关于java - 如何在 RxJava 中让多个订阅者订阅一个分组的 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25374403/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com