gpt4 book ai didi

java - RxJava flatMap 和背压奇怪的行为

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:08:32 25 4
gpt4 key购买 nike

在使用 RxJava 编写数据同步作业时,我发现了一个我无法解释的奇怪行为。我是 RxJava 的新手,希望得到帮助。

简而言之,我的工作非常简单,我有一个元素 ID 列表,我调用 Web 服务通过 ID 获取每个元素,进行一些处理并多次调用以将数据推送到数据库。数据加载比数据存储快,所以我遇到了 OutOfMemory 错误。

我的代码看起来很像“失败”测试,但在进行一些测试后我意识到删除该行:

flatMap(dt -> Observable.just(dt))

让它发挥作用。失败的测试输出清楚地表明未消耗的项目堆积起来,这会导致 OutOfMemory。工作测试输出显示生产者将始终等待消费者,因此这永远不会导致 OutOfMemory。

public static class DataStore {
public Integer myVal;
public byte[] myBigData;

public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}

@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;

AtomicInteger nbUnconsumed = new AtomicInteger(0);

List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});

logger.info("Finished");
}

@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;

AtomicInteger nbUnconsumed = new AtomicInteger(0);

List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});

logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}

这种行为背后的解释是什么?我如何在不删除 Observable.just(dt)) 的情况下解决失败的测试,在我的真实情况下,它是 Observable.from(someListOfItme)

最佳答案

flatMap 默认合并无限量的数据源,通过应用不带 maxConcurrent 参数的特定 lambda,你基本上解除了上游的限制,现在可以全速运行,压倒了其他运营商的内部缓冲区.

关于java - RxJava flatMap 和背压奇怪的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35322480/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com