gpt4 book ai didi

java - RxJava 2 : Why can't PublishProcessor subscribe to an Observable?

转载 作者:行者123 更新时间:2023-12-01 19:39:28 27 4
gpt4 key购买 nike

我想在 RxJava 中实现相当简单的 DAG。

我们有元素来源:

Observable<String> itemsObservable = Observable.fromIterable(items)

接下来,我想要一个能够订阅 itemsObservable 的处理器并将允许多个订阅者订阅它。

所以我创建了:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

不幸的是,这是不可能的:
itemsObservable.subscribe(itemsProccessor);

为什么?实现这种 DAG 的正确 API 是什么?

这是一个演示图:

enter image description here

这是我(失败的)尝试实现这种 DAG:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);

最佳答案

这是因为 PublishProcessor 实现了 Subscriber,而 Observable 的 subscribe 方法接受 Observer。您可以将 itemsObservable 转换为 Flowable,它就能完成这项工作。

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
PublishProcessor<String> processor = PublishProcessor.create();
items.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(processor);

关于java - RxJava 2 : Why can't PublishProcessor subscribe to an Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55808549/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com