gpt4 book ai didi

java - RxJava -- 终止无限流

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:59:20 24 4
gpt4 key购买 nike

我正在探索响应式编程和 RxJava。这很有趣,但我被困在一个我找不到答案的问题上。我的基本问题:终止无限运行的 Observable 的响应式适当方法是什么?我也欢迎对我的代码提出批评和响应式最佳实践。

作为练习,我正在编写日志文件尾部实用程序。日志文件中的行流由 Observable<String> 表示.获取 BufferedReader为了继续阅读添加到文件中的文本,我忽略了通常的 reader.readLine() == null终止检查并将其解释为意味着我的线程应该 hibernate 并等待更多记录器文本。

但是虽然我可以使用 takeUntil 终止 Observer ,我需要找到一种干净的方法来终止无限运行的文件观察器。我可以自己写 terminateWatcher方法/字段,但这打破了 Observable/Observer 封装——我想尽可能严格地遵守响应式(Reactive)范例。

这是 Observable<String>代码:

public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;

@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}

return null; // Not sure what Subscription I should return
}
}

这是 Observer 代码,它在新行出现时打印它们:

public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.

Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}

我的两个问题是:

  1. 什么是 react 一致的方式来终止一个无限运行的流?
  2. 我的代码中还有哪些让您哭泣的错误? :)

最佳答案

由于您是在请求订阅时启动文件监视,因此在订阅结束时终止它是有意义的,即当 订阅 关闭时。这解决了代码注释中的一个松散问题:您返回一个 Subscription,它告诉 FileWatcher 停止监视文件。然后替换您的循环条件,使其检查订阅是否已被取消,而不是检查当前线程是否已被中断。

问题是,如果您的 onSubscribe() 方法永远不会返回,这就不是很有用了。也许您的 FileWatcher 应该需要指定一个调度程序,并在该调度程序上进行读取。

关于java - RxJava -- 终止无限流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20663255/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com