gpt4 book ai didi

java - RxJava - 通过另一个控制一个 Observable

转载 作者:行者123 更新时间:2023-11-30 10:19:12 28 4
gpt4 key购买 nike

已更新

我正在寻找一种方法来控制一个可被另一个观察到的流量。例如,让我们有 2 个单调递增(重要) 整数观测值:

source  : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------

我需要生成一个新的可观察对象,其元素与源完全匹配,但它们的时间由控制可观察对象以下列方式控制:源值应始终小于或等于控制值。这意味着只有大于最近发布的控件的所有源值应该等到它们被控件“释放”

source         : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------
expected result: -1----2-2--2--3--3-----4-4--5------6-------8-9---------10-11---

请看下面的代码示例:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
// ???
}

@Test
public void testControl() throws InterruptedException {
Subject<Integer> control = PublishSubject.create();
Observable<Integer> source = Observable.fromArray(1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 8, 10, 11);
Observable<Integer> combined = combine(source, control, (s, c) -> s <= c);
control.subscribe(val -> System.out.println("Control: " + val));
combined.observeOn(Schedulers.io()).subscribe(val -> System.out.println("Value: " + val));

control.onNext(3); // should release 1,2,2,2,3,3
Thread.sleep(1000);
control.onNext(6); // should release 4,4,5,6
Thread.sleep(1000);
control.onNext(11); // should release 8,10,11
Thread.sleep(1000);
}

最佳答案

由于我没有找到任何优雅的解决方案,所以我最终自己实现了它。如果有人能提出更优雅的解决方案,我会很高兴(在这种情况下,我将不接受这个答案并接受更好的答案)。以下是我的解决方案:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
return Observable.create(emitter -> {
Queue<T> buffer = new ArrayDeque<>();
AtomicReference<C> lastControl = new AtomicReference<>();
CompletableSubject sourceCompletable = CompletableSubject.create();
CompletableSubject controlCompletable = CompletableSubject.create();
Disposable disposable = new CompositeDisposable(
control.subscribe(
val -> {
lastControl.set(val);
synchronized (buffer) {
while (!buffer.isEmpty() && predicate.apply(buffer.peek(), val)) {
emitter.onNext(buffer.poll());
}
}
},
emitter::onError,
controlCompletable::onComplete),
source.subscribe(
val -> {
C lastControlVal = lastControl.get();
synchronized (buffer) {
if (lastControlVal != null && predicate.apply(val, lastControlVal)) {
emitter.onNext(val);
} else {
buffer.add(val);
}
}
},
emitter::onError,
sourceCompletable::onComplete),
controlCompletable.andThen(sourceCompletable).subscribe(emitter::onComplete));
emitter.setDisposable(disposable);
});
}

关于java - RxJava - 通过另一个控制一个 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48767779/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com