gpt4 book ai didi

java - RxJava2 Flowable区分最后一个元素

转载 作者:行者123 更新时间:2023-11-30 05:28:43 25 4
gpt4 key购买 nike

我想处理 Flowable分批,在每个批处理之后等待,直到完成一些异步工作而不阻塞线程,并以不同的方式处理最后一个批处理。有没有比使用 AtomicReference 更好的方法缓存前一批并处理 onComplete() 中的最后一批?

      AtomicReference<List<Integer>> batchRef = new AtomicReference<>();
Flowable.just(1, 2, 3, 4, 5, 6)
.buffer(2)
.concatMapCompletable(batch -> {
List<Integer> previousBatch = batchRef.getAndSet(batch);
if (previousBatch != null) {
System.out.println("Regular batch: " + previousBatch);
}
// Something asynchronous would go here
return Completable.complete();
})
.subscribe(() -> {
System.out.println("Last batch: " + batchRef.get());
});

最佳答案

因此您希望以特定方式对流事件使用react。最直接的方法是创建自定义订阅者类,它完全满足您的需要:

static class MySubscriber implements FlowableSubscriber<List<Integer>> {
List<Integer> previousBatch;

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(List<Integer> batch) {
if (previousBatch != null) {
System.out.println("Regular batch: " + previousBatch);
}
previousBatch = batch;
}

@Override
public void onError(Throwable throwable) {}

@Override
public void onComplete() {
System.out.println("Last batch: " + previousBatch);
}
}

public static void main(String[] args) {
Flowable.just(1, 2, 3, 4, 5, 6)
.buffer(2)
.subscribe(new MySubscriber());
}

}

关于java - RxJava2 Flowable区分最后一个元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58008980/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com