gpt4 book ai didi

java - RXJava2阅读器可用于并发订阅者

转载 作者:行者123 更新时间:2023-12-03 13:08:26 27 4
gpt4 key购买 nike

我正在尝试从Observable<String>实现RXJava2 BufferedReader。到目前为止,一切都很好:

public class Launcher {
public static void main(String[] args) throws Exception {

Process process = Runtime.getRuntime().exec("ls /tmp");
BufferedReader reader = new BufferedReader (new InputStreamReader(process.getInputStream()));

Observable source = Observable.create(emitter -> {
String line = "";
while (line != null) {
line = reader.readLine();
if (line == null)
break;
System.out.println("Engine " + Thread.currentThread().getName() + " - " + line); //process line
emitter.onNext(line);
}
emitter.onComplete();
}).subscribeOn(Schedulers.newThread());

source.subscribe(line -> System.out.println("UI 1 " + Thread.currentThread().getName() + " - " + line));
source.subscribe(line -> System.out.println("UI 2 " + Thread.currentThread().getName() + " - " + line));

TimeUnit.SECONDS.sleep(10);

}
}
onSubscribe使订阅者以并行方式得到通知。如果我没记错的话,这意味着 create()中的lambda将为每个使用者并行执行。

结果,如果我有两个订阅者,每个订阅者将获得阅读器行数的一半。线程1调用 readLine()来获取线程线程2不会得到,只是下一行。

尽管如此,这一切还是有道理的,因为我不知道如何:
  • 在一个线程中读取行
  • 同时通知所有订户-因此每个人都获得所有行

  • 我研究了 Subject,试图链接 Observable,但仍无法弄清楚。

    编辑:我将示例更新为完整的可运行类。据我了解,这个问题是可观察到的热与冷。好像文档说 Observable.create(...)应该创建一个冷的,而我的代码显然表现得很热。

    后续问题:如果我添加类型参数使其成为 Observable<String>,那么 onSubscribe调用将破坏代码,并且它将无法编译,因为这将返回 Observable<Object>。为什么?在中间参数上调用 onSubscribe很奇怪:
    Observable<String> source = Observable.create(emitter -> {...});
    Observable<String> source2 = source.subscribeOn(Schedulers.newThread())

    最佳答案

    使用publish:

    ConnectableObservable<String> o = Observable.create(emitter -> {
    try (BufferedReader reader = ...) {
    while (!emitter.isDisposed()) {
    String line = reader.readLine();
    if (line == null || line.equals("end")) {
    emitter.onComplete();
    return;
    }
    emitter.onNext(line);
    }
    }
    }).subscribeOn(Schedulers.io())
    .publish();

    o.subscribe(/* consumer 1 */);
    o.subscribe(/* consumer 2 */);

    o.connect();

    关于java - RXJava2阅读器可用于并发订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48655602/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com