obs = Observable.from(constituents.entrySet()); Subscriber> sub = new-6ren">
gpt4 book ai didi

java - 使顺序处理异步,而不使用 "parallel"运算符

转载 作者:行者123 更新时间:2023-12-01 11:24:54 27 4
gpt4 key购买 nike

我现在正在研究这个 Java 方法:

Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet());

Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){
String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId;
ConstituentInfo constituent;
Integer compId;

@Override
public void onNext(Map.Entry<String, ConstituentInfo> entry) {
logger.info("blah blah test");
}

@Override
public void onCompleted() {
logger.info("completed successfully");
}

@Override
public void onError(Throwable throwable) {
logger.error(throwable.getMessage());
throwable.printStackTrace();
}
};

obs.observeOn(Schedulers.io()).subscribe(sub);

该方法本质上处理Map.Entry中的每个条目,但这似乎是按顺序处理它(同一线程)。在不使用“并行”运算符(即同时处理条目)的情况下,我将如何使该过程异步?我尝试运行上面的代码,但缺少一些结果(有些结果未正确处理)。

最佳答案

正如所报告的那样,您的可观察对象在主线程上运行,订阅者方法将由 Schedulers.io() 给出的一名工作人员调用,因此这就是它出现在一个线程上的原因。您的代码并不表明订阅者进行日志记录之外正在完成任何实际工作,因此这里无需异步执行任何操作。

也许你打算这样做?

obs.subscribeOn(Schedulers.io()).subscribe(sub);

就并行处理而言,如果您的最后一行是这样的:

obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub);

然后您可以像这样异步完成 doWork 位:

int numProcessors = Runtime.getRuntime().availableProcessors();obs.buffer(Math.max(1, constituents.size()/numProcessors))   .flatMap(list -> Observable.from(list)                   .doOnNext(entry -> doWork(entry)                   .subscribeOn(Schedulers.computation())   .observeOn(Schedulers.io())   .subscribe(sub);

您需要缓冲处理器的工作,否则可能会发生大量线程上下文切换。

我不确定您为什么缺少结果。如果您有可重复的测试用例,请将代码粘贴到此处或将其作为问题报告给 github 上的 RxJava。

关于java - 使顺序处理异步,而不使用 "parallel"运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30900575/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com