gpt4 book ai didi

java - RxJava : Split Rx Flowable into multiple streams

转载 作者:行者123 更新时间:2023-12-01 17:57:10 24 4
gpt4 key购买 nike

我想对流进行一些操作,然后将流分成两个流,然后分别处理它们。

显示问题的示例:

Flowable<SuccessfulObject> stream = Flowable.fromArray(
new SuccessfulObject(true, 0),
new SuccessfulObject(false, 1),
new SuccessfulObject(true, 2));

stream = stream.doOnEach(System.out::println);

Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);

successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();

类(class):

class SuccessfulObject {
private boolean success;
private int id;

public SuccessfulObject(boolean success, int id) {
this.success = success;
this.id = id;
}

public boolean isSuccess() {
return success;
}
public boolean isFail() {
return !success;
}

public void setSuccess(boolean success) {
this.success = success;
}

@Override
public String toString() {
return "SuccessfulObject{" +
"id=" + id +
'}';
}
}

但是此代码将所有元素打印两次,而我想在仅拆分一次之前执行所有操作。

输出:

OnNextNotification[SuccessfulObject{id=0}]
OnNextNotification[SuccessfulObject{id=1}]
OnNextNotification[SuccessfulObject{id=2}]
OnCompleteNotification
OnNextNotification[SuccessfulObject{id=0}]
OnNextNotification[SuccessfulObject{id=1}]
OnNextNotification[SuccessfulObject{id=2}]
OnCompleteNotification

如何处理流以接收此行为?

最佳答案

使用publish共享对源的订阅:

Flowable<Integer> source = Flowable.range(1, 5);

ConnectableFlowable<Integer> cf = source.publish();

cf.filter(v -> v % 2 == 0).subscribe(v -> System.out.println("Even: " + v));

cf.filter(v -> v % 2 != 0).subscribe(v -> System.out.println("Odd: " + v));

cf.connect();

关于java - RxJava : Split Rx Flowable into multiple streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43791717/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com