gpt4 book ai didi

java - Rx java 内存不足

转载 作者:行者123 更新时间:2023-12-01 10:27:02 26 4
gpt4 key购买 nike

已编辑:请参阅这个问题,该问题更加清晰和准确: RxJava flatMap and backpressure strange behavior

我目前正在使用 RxJava 编写数据同步作业,对于响应式(Reactive)编程,特别是 RxJava 库,我还是新手。

我的工作非常简单,我有一个元素 ID 列表,我调用一个 Web 服务来通过 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库。

我使用1个io线程从WS加载数据,并使用多个io线程将数据推送到DB。但是我总是会遇到 OutOfMemory 错误。我首先认为从 WS 加载数据比将数据存储在 DB 中更快。

但是,由于 WS 调用和 DB 调用都是同步调用,因此它们是否应该互相施加背压?

感谢您的帮助。

我的代码几乎是这样的:

@Test
public void test() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;

List<Integer> ids = IntStream.range(0, 10000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.toBlocking().forEach(s -> System.out.println("Value " + s));

System.out.println("Finished");
}

private Observable<Integer> produce(final int value) {
return Observable.<Integer>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(500); //Here I call WS to retrieve data
s.onNext(value);
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(Integer value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(10000); //Here I call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}

最佳答案

看来你的WS是基于轮询的,所以如果你使用fromCallable而不是你的自定义Observable,你会得到适当的背压:

return Observable.<Integer>fromCallabe(s -> {
Thread.sleep(500); //Here I call WS to retrieve data
return value;
}).subscribeOn(Schedulers.io());

否则,如果您有阻塞 WS 和阻塞数据库,则可以使用它们相互反压:

ids.map(id -> db.store(ws.get(id)).subscribeOn(Schedulers.io())
.toBlocking().subscribe(...)

并且还可能省略 subscribeOn 和 toBlocking。

关于java - Rx java 内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35316897/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com