gpt4 book ai didi

java - 转换 observable 以在值之间具有最小延迟

转载 作者:行者123 更新时间:2023-11-29 06:54:24 24 4
gpt4 key购买 nike

我正在寻找 this question 的良好解决方案但在 RxJava 中实现。这个问题也有五年多了,所以我想知道 - 是否有更好的方法来实现此输出?

What I'm trying to achieve is to buffer incoming events from someIObservable ( they come in bursts) and release them further, but oneby one, in even intervals. Like this:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

对我来说最大的要求是没有观察到的东西丢失,并且事件的顺序保持不变。

最佳答案

此特定模式需要记住上一个事件的安排时间,因此如果下一个事件在一段时间后发生,它可以立即发出并开始新的周期性发出。也许更简单有效的方法是编写自定义运算符:

import java.util.concurrent.TimeUnit;

import rx.*;
import rx.Observable.Operator;
import rx.schedulers.Schedulers;

public class SpanOut<T> implements Operator<T, T> {
final long time;

final TimeUnit unit;

final Scheduler scheduler;

public SpanOut(long time, TimeUnit unit, Scheduler scheduler) {
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> t) {
Scheduler.Worker w = scheduler.createWorker();

SpanSubscriber<T> parent = new SpanSubscriber<>(t, unit.toMillis(time), w);

t.add(w);
t.add(parent);

return parent;
}

static final class SpanSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> actual;

final long spanMillis;

final Scheduler.Worker worker;

long lastTime;

public SpanSubscriber(Subscriber<? super T> actual,
long spanMillis, Scheduler.Worker worker) {
this.actual = actual;
this.spanMillis = spanMillis;
this.worker = worker;
}

@Override
public void onNext(T t) {
long now = worker.now();
if (now >= lastTime + spanMillis) {
lastTime = now + spanMillis;
worker.schedule(() -> {
actual.onNext(t);
});
} else {
long next = lastTime - now;
lastTime += spanMillis;
worker.schedule(() -> {
actual.onNext(t);
}, next, TimeUnit.MILLISECONDS);
}
}

@Override
public void onError(Throwable e) {
worker.schedule(() -> {
actual.onError(e);
unsubscribe();
});
}

@Override
public void onCompleted() {
long next = lastTime - worker.now();
worker.schedule(() -> {
actual.onCompleted();
unsubscribe();
}, next, TimeUnit.MILLISECONDS);
}

@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}

public static void main(String[] args) {
Observable.range(1, 5)
.concatWith(Observable.just(6).delay(6500, TimeUnit.MILLISECONDS))
.concatWith(Observable.range(7, 4))
.lift(new SpanOut<>(1, TimeUnit.SECONDS, Schedulers.computation()))
.timeInterval()
.toBlocking()
.subscribe(System.out::println);
}
}

关于java - 转换 observable 以在值之间具有最小延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37445963/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com