gpt4 book ai didi

java - RxJava并行计算

转载 作者:行者123 更新时间:2023-12-02 12:14:32 25 4
gpt4 key购买 nike

我试图展示 RxJava 与顺序(我假设的)阻塞计算相比的性能。

我正在查看this postthis SO question 。根据经验,在处理计算而不是 I/O 时,使用 System.currentTimeMillis() 和 Thread.sleep() 进行基准测试不会产生一致的结果,因此我尝试设置一个简单的 JMH相反,基准测试。

我的基准测试计算两个整数并将它们相加:

public class MyBenchmark {

private Worker workerSequential;
private Worker workerParallel;

private int semiIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i)))))))))))))))));
return d.intValue() + i;
}

private int nonIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i)))))));
return d.intValue() + i;
}


private Observable<Object> intensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {

int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
Integer i = semiIntenseCalculation(randomNumforSemi);
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
Integer j = nonIntenseCalculation(randomNumforNon);

return i+j;
}
});
};

private Observable<Object> semiIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
return semiIntenseCalculation(randomNumforSemi);
}
});
};

private Observable<Object> nonIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
return nonIntenseCalculation(randomNumforNon);
}
});
};

public interface Worker {
void work();
}

@Setup
public void setup(final Blackhole bh) {

workerSequential = new Worker() {

@Override
public void work() {


Observable.just(intensiveObservable())
.subscribe(new Subscriber<Object>() {

@Override
public void onError(Throwable error) {

}

@Override
public void onCompleted() {

}

@Override
public void onNext(Object arg) {
bh.consume(arg);

}
});
}
};

workerParallel = new Worker() {
@Override
public void work() {
Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()),
nonIntensiveObservable().subscribeOn(Schedulers.computation()),
new Func2<Object, Object, Object>() {

@Override
public Object call(Object semiIntensive, Object nonIntensive) {
return (Integer)semiIntensive + (Integer)nonIntensive;
}

}).subscribe(bh::consume);
}
};

}

@Benchmark
public void calculateSequential() {
workerSequential.work();
}

@Benchmark
public void calculateParallel() {
workerParallel.work();
}
}

我对结果感到困惑:

# Run complete. Total time: 00:00:21

Benchmark Mode Cnt Score Error Units
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op

显然我希望并行计算能够更快。 RxJava 只适用于并行 I/O 或者为什么我会得到这些结果?

最佳答案

你做的基准测试是错误的。您应该等待并行工作完成,否则(通过 blockingSubscribe)您将启动大量并行工作,这会增加大量 GC 开销,并会增加执行器的内部队列。

Here是衡量各种并行工作的引用基准。请注意,分派(dispatch)工作本身就有开销,除非并行设置中每个工作项有 500 个以上的周期,否则您可能看不到此类 fork-join 类型并行工作负载的任何改进。

关于java - RxJava并行计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46284382/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com