gpt4 book ai didi

rx-java2 - 中断 concatMap 中的单个 observable

转载 作者:行者123 更新时间:2023-12-04 16:03:57 24 4
gpt4 key购买 nike

我用 concatMap以长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但仅限于当前项目:

@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();

Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)

// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}

我试图通过保留“内部”流的一次性并在正确的时间处理它来解决这个问题。但这没有用。内部流已处理,但 concatMap从未继续处理第 3 项。测试只是挂起(因为外部 observable 也从未完成/终止/处置)
Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();

Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();

test.awaitTerminalEvent();
System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)

// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}

即使这确实有效,依靠副作用似乎也有点麻烦。实现这一目标的最佳方法是什么?

最佳答案

我遇到了与 How to cancel individual network request in Retrofit with RxJava? 几乎相同的问题.我可以使用 PublishSubject “打断”

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();

Thread.sleep(10000);
System.out.println("interrupt NOW");
interrupter.onComplete();

test.awaitTerminalEvent();
System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
interrupter = PublishSubject.create();

return Observable
.just("Start working on " + integer,
"Still working on " + integer,
"Almost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("Finally for " + integer))
.takeUntil(interrupter);
}

关于rx-java2 - 中断 concatMap 中的单个 observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55554929/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com