gpt4 book ai didi

java - RxJava 2 Observable onComplete 重新提交自身

转载 作者:行者123 更新时间:2023-12-02 02:57:10 25 4
gpt4 key购买 nike

我是 RxJava 新手。我正在尝试创建一个可观察的对象,当它完成时,它将重新开始,直到我调用 dispose,但是一段时间后我遇到了 OutofMemory 错误,下面是我正在尝试执行的操作的简化示例

  public void start() throws RuntimeException {
log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());

try {

executeObserve();

log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
} catch (Exception e) {
log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
}
}

start 在初始化阶段被调用一次,executeObserve 如下(简化形式..)。请注意,在 onComplete 上我“重新提交”executeObserve

public void executeObserve() throws RuntimeException {

Observable<Book> booksObserve
= manager.getAsObservable();

booksObserve
.map(Book::getAllOrders)
.flatMap(Observable::fromIterable)
.toList()
.subscribeOn(Schedulers.io())
.subscribe(collectedISBN ->
Observable.fromIterable(collectedISBN)
.buffer(10)
// ...some more steps here...
.toList()
.toObservable()
// resubmit
.doOnComplete(this::executeObserve)
.subscribe(validISBN -> {
// do something with the valid ones
})
)
);
}

我的猜测是,如果我想重新提交任务但找不到任何文档,这不是可行的方法。

booksObserve 的实现如下

public Observable<Book> getAsObservable() {
return Observable.create(e -> {
try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
for (Book r : rs) {
e.onNext(r);
}
e.onComplete();
} catch (Exception ex) {
e.onError(ex);
}
});
}

在我们调用 dispose 或等效方法之前不断重新提交操作的正确方法是什么?我正在使用 RxJava 2

最佳答案

您创建了一个无休止的递归,循环将创建越来越多的资源,有时会出现 OutOfMemory/Stack 溢出异常。

为了重复Observable您应该使用的工作 repeat()运算符,它将重新订阅 Observable当它收到onComplete()时。

除此之外,还有一些关于您的代码的一般性评论:

  • 为什么要嵌套第二个 Observable在订阅者内部?你正在打破链条,你可以继续链条而不是创建新的 Observable在订阅者处。
  • 而且,看起来(假设 O bservable.fromIterable(collectedBets) 使用 collectedISBNonNext() o.w. 从哪里来?)您将所有项目收集到一个列表中,然后将其展平再次使用 from iterable,所以看起来你可以继续流,类似这样:

    booksObserve
    .map(Book::getAllOrders)
    .flatMap(Observable::fromIterable)
    .buffer(10)
    // ...some more steps here...
    .toList()
    .toObservable()
    // resubmit
    .doOnComplete(this::executeObserve)
    .subscribeOn(Schedulers.io())
    .subscribe(validISBN -> {
    // do something with the valid ones
    });
  • 无论如何,使用嵌套的 Observablerepeat()运算符将仅重复嵌套的流,而不是整个流(这就是您想要的),因为它未连接到它。

关于java - RxJava 2 Observable onComplete 重新提交自身,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42890312/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com