gpt4 book ai didi

java - RXJava 暂停缓冲区

转载 作者:太空宇宙 更新时间:2023-11-04 12:45:27 24 4
gpt4 key购买 nike

嗨,我是 RXJava 的新手。我正在寻找一种可观察的解决方案,该解决方案将根据收到的项目继续和暂停发射项目。

假设我们的条件是这个整数谓词:

Func1<Integer, Boolean> isOdd = number -> number%2==1;

当我们向主题添加数字时,如: myNumberSubject.onNext(someInt); 在添加奇数之前添加的所有数字都存储在缓冲区中,但添加第一个奇数时,缓冲区中的所有数字都会一次性发出(包括奇数项)。

之后,只要是奇数,每个数字都会被一一发出。当添加偶数时,它会再次被放入缓冲区。

我一直能够找到任何实际的工作示例,但是这个 pausableBuffer 的弹珠示例可能有潜力完全实现我想要做的事情。 http://rxmarbles.com/#pausableBuffered我希望有一些预先存在的 RXJava 解决方案可以解决这个问题。这是我自己的黑客作业解决方案。

public class PausableBuffer<R> {

private boolean isPaused;
private List<R> buffer;
private ReplaySubject<R> regulatedSubject;

private PausableBuffer(){
regulatedSubject = ReplaySubject.create();
buffer=new LinkedList<>();
}

public static <R>Observable<R> create(Observable<R> observable, Func1<R, Boolean> continueCondition){
PausableBuffer<R> pausableBuffer = new PausableBuffer<>();
observable.subscribe(value -> {
synchronized(pausableBuffer) {
if(pausableBuffer.isPaused){
pausableBuffer.buffer.add(value);
if(continueCondition.call(value)){
pausableBuffer.isPaused=false;
for (R r : pausableBuffer.buffer) {
pausableBuffer.regulatedSubject.onNext(r);
}
pausableBuffer.buffer.clear();
}
}else{
if(continueCondition.call(value)){
pausableBuffer.regulatedSubject.onNext(value);
}else{
pausableBuffer.isPaused=true;
pausableBuffer.buffer.add(value);
}
}
}
});
return pausableBuffer.regulatedSubject.asObservable();
}

public static void main(String[] args) {
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
Observable<Integer> observable = PausableBuffer.<Integer>create(
behaviorSubject.asObservable(),
intValue -> intValue==5 || 6<intValue);//continueCondition
observable.subscribe(v -> System.out.print(v+", "));
for (int i = 0; i <= 8; i++) {
System.out.print("adding " + i + " : ");
behaviorSubject.onNext(i);
System.out.println();
}
}
}

打印输出:

  • 添加 0:
  • 添加 1:
  • 添加 2:
  • 添加 3:
  • 添加 4:
  • 添加 5:0、1、2、3、4、5、
  • 添加 6:
  • 添加 7 : 6, 7,
  • 添加 8 : 8,

最佳答案

第二个版本:

public final class ContinueWhile<T> implements Observable.Operator<T, T> {

final Func1<T, Boolean> continuePredicate;

private ContinueWhile(Func1<T, Boolean> continuePredicate) {
this.continuePredicate = continuePredicate;
}

public static <T>ContinueWhile<T> create(Func1<T, Boolean> whileTrue){
return new ContinueWhile<>(whileTrue);
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ContinueWhileSubscriber parent = new ContinueWhileSubscriber(child);
child.add(parent);
return parent;
}

final class ContinueWhileSubscriber extends Subscriber<T> {

final Subscriber<? super T> actual;
Deque<T> buffer = new ConcurrentLinkedDeque<>();

public ContinueWhileSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}

@Override
public void onNext(T t) {
buffer.add(t);
if (continuePredicate.call(t)) {
while(!buffer.isEmpty())
actual.onNext(buffer.poll());
}
}

@Override
public void onError(Throwable e) {
buffer = null;
actual.onError(e);
}

@Override
public void onCompleted() {
while (!buffer.isEmpty())
actual.onNext(buffer.poll());
buffer=null;
actual.onCompleted();
}
}
}




public static void main(String[] args) {
BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.asObservable()
.doOnNext(v -> System.out.print("next "))
.lift(ContinueWhile.create(i -> i%3==0))
.subscribe(v -> System.out.print(v + ", "));
for (int i = 0; i < 10; i++) {
subject.onNext(i);
}
}
}

感谢 AndroidEx。

关于java - RXJava 暂停缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36385194/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com