gpt4 book ai didi

java - RxJava v1 到 v2

转载 作者:行者123 更新时间:2023-11-30 06:19:53 24 4
gpt4 key购买 nike

我正在尝试学习 RxJava 2,但从昨天下午开始,我在尝试将 RxJava 库的 v1 转换为 v2 时遇到了麻烦...我遇到了这个函数,我可以用它来做我想要的事情这样做可以帮助我理解整个响应式(Reactive)编程范式。

List<Integer> emitList = ...;
Observable<Integer> observable = Observable.from(emitList);

observable
.subscribeOn(Schedulers.newThread())
.parallel((a) -> {
return a
.filter((i) -> {
return i % 2 == 0;
})
.doOnNext((xx) -> {
System.out.println("parallel thread in: " + ThreadUtils.currentThreadName());
System.out.println("parallel: " + xx);
ThreadUtils.sleep(10);
System.out.println("parallel thread out: " + ThreadUtils.currentThreadName());
});
},
Schedulers.io()
)
.subscribe(
(i) -> {
System.out.println("onNext thread entr: " + ThreadUtils.currentThreadName());
System.out.println(i);
System.out.println("onNext thread exit: " + ThreadUtils.currentThreadName());
},
(t) -> {
t.printStackTrace();
},
() -> {
System.out.println("onCompleted()");
}
);

我最远的距离是这样的:

Observable<Integer> observable = ....
observable.subscribeOn(Schedulers.newThread())
.filter(i -> i % 2 == 0)
.doOnNext(i -> {
System.out.println("parallel thread in: " + threadName());
System.out.println("parallel: " + i);
Thread.sleep(10);
})
.subscribe(
number -> System.out.println(threadName() + ": " + number),
throwable -> System.err.println(threadName() + ": " + throwable.toString()),
() -> System.out.println(threadName() + ": Completed!")

);

我知道我正在做的事情有很多错误。对于初学者来说,过滤和 doOnNext 位于并行子句的内部,而在我的“方法”中,它位于并行子句的外部,谁知道还有什么。我尝试在 RxJava 存储库上进行测试,但我无法识别任何与此类似的测试。我查看了 Flowable 和 ParallelFlowable,但它们有很大不同,以至于我找不到如何在我的版本上实现并行性......顺便说一句,它不会打印任何东西。

最佳答案

RxJava 2 中的并行处理与 Flowable 相关联,并使用与 Observable 相同的流畅 API 设计:

Flowable<Integer> f = ....
f.subscribeOn(Schedulers.newThread())
.parallel() // <---------------------------------
.runOn(Schedulers.computation()) // <---------------------------------
.filter(i -> i % 2 == 0)
.doOnNext(i -> {
System.out.println("parallel thread in: " + threadName());
System.out.println("parallel: " + i);
Thread.sleep(10);
})
.sequential() // <---------------------------------
.subscribe(
number -> System.out.println(threadName() + ": " + number),
throwable -> System.err.println(threadName() + ": " + throwable.toString()),
() -> System.out.println(threadName() + ": Completed!")
);

Thread.sleep(10000);

关于java - RxJava v1 到 v2,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48387452/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com