gpt4 book ai didi

java - Java中无限并行流图减少有时间限制的管道

转载 作者:太空宇宙 更新时间:2023-11-04 09:15:30 25 4
gpt4 key购买 nike

我正在尝试在 Java 中实现这种并发策略。我真的不在乎我使用的是流还是执行器,这里重要的是效率。

我想在给定的期限内并行运行尽可能多的模拟。单个模拟的运行速度快于超时时间。只要底层线程池有容量,就应该提交新的模拟。

+----------------+         +--------------+           +------------+
| | param | | Solution | |
| SimCoordinator +-------->+ Simulation +----+----->+ reduce(min)|
| | | | | | |
+-------+--------+ +--------------+ | +------------+
^ |
+--------------------------------------+

类看起来像:

class SimCoordinator {
double getParam(); //changes every time
double putSolution(Solution s);
}

class Simulation {
Solution run(double param); // takes a while to compute
}

协调器向任务提供参数。

然后运行任务并获得解决方案。

解决方案必须反馈给协调器,这将改变参数的生成方式。

每个解决方案都有成本;我想以最小的成本保存解决方案,并在运行模拟时执行此操作,而不是在截止日期之后。

Simulation.run() 是一个耗时的操作,因此它应该与其他模拟同时运行。假设我有一个包含 N 个线程的线程池(或者并行流正在工作)。最初 simcoordinator 为所有线程提供模拟,当模拟完成时,它会创建新的线程。

解决此类问题的最佳方法是什么?

最佳答案

所以本质上你所拥有的是

  • 模拟 - a Function<Double, Double>
  • 根据旧参数及其结果计算新参数,即 BiFunction<Double, Double, Double>
  • 记录最低参数/分数集的评估,即 Consumer<Double, Double> .
  • 计时器耗尽,阻止新的模拟开始。

所以总而言之,你可以拥有类似的东西

class SimulationDriver {
private ExecutorService service;
AtomicBoolean finished = new AtomicBoolean();
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

private Function<Double, Double> simulation;
private BiFunction<Double, Double, Double> calculateNewParameter;

private Map<Double, Double> results = new ConcurrentHashMap<>();

private SimulationDriver(Function<Double, Double> sim,
BiFunction<Double, Double, Double> newParam) {
simulation = sim;
calculateNewParameter = newParam;
}

public void run(int timeout, Runnable callback, Double... params) {
if (service != null) throw new InvalidStateException("already running");

// initial simulations, one in each thread
service = Executors.newFixedThreadPool(params.length);
for (Double d : params) {
submitForSimulation(d);
}
// after timeout: set finished to prevent new simulations and notify caller
timer.schedule(() -> {
finished.set(true);
service.shutdown();
callback.run(); // tell outside world we're finished
}, timeout, ChronoUnit.SECONDS);
}

void submitForSimulation(Double d) {
if (!finished.get()) { // don't start new simulations after time ran out
CompletableFuture.supplyAsync(() -> simulation.apply(d), service);
result.thenAccept(r -> results.put(d, r)); // store param/result in map
// calculate next parameter and run simulation with that
result.thenAccept(r -> calculateNewParameter.apply(d, r))
.thenAccept(this::submitForSimulation);
}
}
}

关于java - Java中无限并行流图减少有时间限制的管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59069046/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com