gpt4 book ai didi

java - 仅对最新的异步更新数据运行计算

转载 作者:行者123 更新时间:2023-12-03 17:00:37 26 4
gpt4 key购买 nike

Java 奇才!

我想用 Java 尽可能高效地实现以下要求。每一微秒都很重要。

tl;dr 版本 是我有一些计算需要在 数据上运行。一旦数据发生变化,计算就需要运行。如果在计算完成之前数据发生变化,则需要取消计算并重新开始计算最新的数据。

详细说明:

  • 我有 N 个异步更新的新数据源。将它们称为 DataSource 类的实例(DataSource ds1 = new DataSource();DataSource ds2 = new DataSource(); 等)
  • DataSource 的公共(public)方法 getNewData() 返回可用的新数据,否则线程阻塞直到有新数据。
  • GlobalState 成为任何给定时刻流的所有状态的快照。每当任何流更新时,GlobalState 都会发生变化。换句话说,GlobalState 始终 拥有所有流数据的最新信息。如果 Java 是通过引用传递的,人们可能会想象按如下方式实例化一个 GlobalState:GlobalState gs = new GlobalState(ds1.datum, ds2.datum, ...);
  • 一旦 GlobalState 发生变化(由于其中一个流更新),一项工作就会启动,这可能需要一些时间。如果作业在 GlobalState 再次更改之前完成,很好,我们保存结果,然后等待它更改,然后在新状态上工作,无限期。如果它没有GlobalState 再次更改之前完成,则该作业将被取消,并为新状态启动一个新作业。

我最好的猜测:

public class App {
public static void main(String[] args) {

DataSource ds1 = new DataSource(...);
DataSource ds2 = new DataSource(...);
GlobalState gs = new GlobalState(ds1, ds2);

ds1.start(); // runs and updates its data asynchronously
ds2.start(); // runs and updates its data asynchronously

Worker worker = new Worker();

while(true) {
try{
GlobalDataState gds = gs.getState(); // this blocks if the state isn't different from when the method was last called.
Future result = worker.doWork(gds); // work happening in a different thread.
System.out.println("Result is: " + result.get()); // blocks until its result is available or cancelled.
} catch (CancellationException ce) {
System.err.println("Workers too slow! Starting over on new data.");
}
}
}
}

public class Worker {

private Future pendingResult;
private final ExecutorService exec;

public Worker() {
this.exec = Executors.newFixedThreadPool(2);
}

public Future doWork(GlobalDataState gds) { // GlobalDataState implements Callable
// cancels jobs that hadn't finished yet.
if (pendingResult != null ) {
if (!pendingResult.isDone()) {
pendingResult.cancel(true);
}
}
pendingResult = exec.submit(gds);

return pendingResult;
}

}

我遇到的主要问题是弄清楚如何以不需要我在循环中轮询新数据的方式实现 GlobalState。我在想这样做的方法是使用容量为 1 的阻塞队列,(SynchronousQueue, ArrayBlockingQueue(1), ...?) 但我只希望它阻止 take() 而不是 put()。如果主线程在调用 gs.getState() 时被阻塞,我们就不能让这个阻塞程序中添加新的 GlobalDataState 的部分元素队列。另一方面,如果数据更新速度快于我的工作人员可以处理的速度,我不希望旧数据在此队列中等待。如果队列中有一个 GlobalDataState 对象,并且正在提供另一个对象,则它需要驱逐坐在那里的对象并添加新对象。这样,无论何时主线程开始调用 gs.getState(),它绝对是最新的信息。

我也考虑过使用 Phaser 来管理计算的进展,但我的每一次尝试似乎都很困惑。

所以这是我最好的猜测。我将不胜感激有关数据结构和/或设计模式的任何建议,以最好地实现应用程序目标。请记住,每一微秒都很重要。

谢谢!

最佳答案

我只理解你的“tl;dr 版本”,但这应该很简单:每当有新数据出现时,你的一些同步方法就会被调用(这必须是可能的,否则你无法更新状态)。在这个方法中你可以取消之前计算对应的future对象,提交你的新计算,用新的future对象替换之前的future对象。仅此而已,您不需要队列或类似队列的容器。

关于java - 仅对最新的异步更新数据运行计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28956027/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com