gpt4 book ai didi

java - RxJava 中 REAL backpressure 的最佳实现

转载 作者:行者123 更新时间:2023-11-30 08:31:38 32 4
gpt4 key购买 nike

嗯,RxJava 中的背压并不是真正的背压,而只是忽略了一些元素集合。

但是如果我不能释放任何元素并且我需要以某种方式减慢发射速度怎么办?

RxJava不能影响元素发射,所以开发者需要自己实现。但是如何呢?

想到的最简单的方法是使用一些计数器,在发射时递增,在完成时递减。

像那样:

public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {

AtomicInteger counter = new AtomicInteger();

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
while (!s.isUnsubscribed()) {
if (counter.get() < 100) {
s.onNext(Math.random());
counter.incrementAndGet();
} else {
sleep(100);
}
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(x -> sleep(1000))
.doOnNext(x -> counter.decrementAndGet())
)
.subscribe();
}

但是我觉得这种方式很差。有没有更好的解决方案?

最佳答案

Well, backpressure in RxJava is not real backpressure

RxJava的背压实现是后续生产者和消费者之间通过请求 channel 进行非阻塞协作。消费者通过 request() 请求一定数量的元素,而生产者通过 onNext 创建/生成/发送最多该数量的项目,有时在 之间有延迟onNexts.

but only ignoring some sets of elements.

只有当您明确告诉 RxJava 丢弃任何溢出时才会发生这种情况。

RxJava cannot affect element emition, so developer needs to implement it by himself. But how?

使用 Observable.create 需要了解如何实现非阻塞背压的高级知识,实际上不建议库用户使用。 RxJava 有很多方法可以让您轻松实现支持背压的流程:

Observable.range(1, 100)
.map(v -> Math.random())
.subscribeOn(sA)
.flatMap(v ->
Observable.just(v).subscribeOn(sB)
.doOnNext(x -> sleep(1000))
)
.subscribe();

Observable.create(SyncOnSubscribe.createStateless(
o -> o.onNext(Math.random())
)
.subscribeOn(sA)
...

关于java - RxJava 中 REAL backpressure 的最佳实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40562521/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com