gpt4 book ai didi

java - RxJava 与用户创建的 Flowable 合并只发出最后一个 Flowable

转载 作者:行者123 更新时间:2023-11-29 04:22:39 24 4
gpt4 key购买 nike

当我在两个由 Flowable.create() 创建的 Flowable (a1, a2) 上使用 Flowable.merge() 时,它只会发出 a2。但是当我合并两个由 Flowable.interval() 创建的 Flowable (b1, b2) 时,它将像我预期的那样发出 b1 和 b2。所以我的问题是为什么这两种方法(创建和间隔)会导致两个不同的结果流?

这里是我的问题的一个简单演示:

public class MergeProblem {

private static FlowableEmitter<Integer> emitter;

public static void main(String[] args) throws Exception {

Flowable<Integer> a = Flowable.create(emitter ->
MergeProblem.emitter = emitter, BackpressureStrategy.BUFFER);

Flowable<String> a1 = a.map(x -> "a1 " + x);
Flowable<String> a2 = a.map(x -> "a2 " + x);

Flowable<Long> b = Flowable.interval(1, TimeUnit.SECONDS);
Flowable<String> b1 = b.map(x -> "b1 " + x);
Flowable<String> b2 = b.map(x -> "b2 " + x);

// Flowable<String> c1 = Flowable.interval(1, TimeUnit.SECONDS)
// .map(x -> "c1 " + x);
// Flowable<String> c2 = Flowable.interval(1, TimeUnit.SECONDS)
// .map(x -> "c2 " + x);

Flowable.merge(Arrays.asList(a1, a2, b1, b2/*, c1, c2*/))
.subscribe(System.out::println);

AtomicInteger counter = new AtomicInteger(0);
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
emitter.onNext(counter.getAndIncrement());
}
}, 0, 1000);

Thread.sleep(10000);
}
}

所以输出将是这样的:

a2 0
a2 1
b1 0
b2 0
b2 1
a2 2
...

Observable 也有同样的问题。

最佳答案

区别在于:

emitter -> MergeProblem.emitter = emitter

通常没有理由从 create 中偷偷取出发射器,但你这样做了,并且给定两个不同的 subscribe() 调用,最新的一个将覆盖 MergeProblem.emitter因此您只有最后一个订阅者可以与之交谈。

看起来您应该改用 PublishProcessor,它会向尽可能多的 Subscriber 发送一个 onNext

关于java - RxJava 与用户创建的 Flowable 合并只发出最后一个 Flowable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48148945/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com