gpt4 book ai didi

java - 如果 ES 上的项目可以重新提交给 ES,我怎么知道 ExecutorService 何时完成

转载 作者:行者123 更新时间:2023-11-30 07:41:02 24 4
gpt4 key购买 nike

我的 Java 应用程序处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 数量不匹配。

例如,如果我们有 8 个 CPU 的计算机,那么(理论上)可以同时处理 8 个文件夹,如果我们有 16 个 CPU 的计算机,则可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹阻塞 I/O 时继续执行某些操作。

但是,我们实际上并没有只有一个 ExecutorService,我们有多个,因为每个文件夹都可以经历多个阶段。

Process1(使用ExecutorService1)→ Process2(ExecutorService2)→ Process3(ExecutorService3)

进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。我们实际上有 12 个进程,但任何特定文件夹都不太可能经历所有 12 个进程

但我意识到这是有缺陷的,因为在 16 CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程在运行,这只会导致过多的争用。

所以我要做的是让所有进程(Process1、Process2...)使用相同的 ExecutorService,这样我们就只有与 CPU 匹配的工作线程。

但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,然后关闭才会完成() 在 Process0 上不会成功,直到所有内容都发送到 Process1 等等。

 //Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());

for (ExecutorService service : services)
//Request Shutdown
service.shutdown();

//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work

但是,如果一切都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再有效,因为在调用 shutdown() 时,并不是 Process1 会提交给 Process2 的所有文件夹,因此它会过早关闭。

那么当该 ES 上的任务可以提交给同一 ES 上的其他任务时,我如何使用单个 ExecutorService 检测所有工作何时完成?

或者有更好的方法吗?

注意,你可能会想他为什么不直接把Process1,2 & 3的逻辑合并成一个Process。困难在于,虽然我最初是按文件夹对歌曲进行分组,但有时歌曲会被分成更小的组,并且它们会被分配到生产线上的单独进程,而且不一定是同一进程,实际上总共有 12 个进程。

基于 Sholms 想法的尝试

主线程

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();

使用 MainAnalyserService 中的此函数提交新任务的流程代码

public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}

看起来好像可以,但失败了

java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

现在我意识到我不能让一个线程调用 future.get()(等待完成),同时其他线程正在添加到列表中。

最佳答案

我同意 Shloim 的观点,您在这里不需要多个 ExecutorService 实例——只需一个(根据您可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。实际上,我认为您可能不需要 ExecutorService;如果您使用信号完整性的外部机制,一个简单的 Executor 就可以完成这项工作。

我会先构建一个类来表示整个较大的工作项。如果您需要使用每个子工作项的结果,您可以使用队列,但如果您只想知道是否还有工作要做,您只需要一个计数器。

例如,您可以这样做:

public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;

private int pendingItems; // guarded by monitor lock on this instance

public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}

@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}

public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}

public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}

public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}

public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}

public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;

public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}

@Override
public void run() {
try {
// do some work with the file

if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}

如果您担心 pendingItems 计数器的同步开销,您可以改用 AtomicInteger。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch。这是一个示例实现:

public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;

private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);

public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}

@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}

public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}

public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}

public boolean hasPendingWork() {
return pendingItems.get() > 0;
}

public void awaitCompletion() {
latch.await();
}
}

你可以这样调用它:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems 计数器来跟踪剩余工作量做。

关于java - 如果 ES 上的项目可以重新提交给 ES,我怎么知道 ExecutorService 何时完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56617083/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com