- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的 Java 应用程序处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 数量不匹配。
例如,如果我们有 8 个 CPU 的计算机,那么(理论上)可以同时处理 8 个文件夹,如果我们有 16 个 CPU 的计算机,则可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹阻塞 I/O 时继续执行某些操作。
但是,我们实际上并没有只有一个 ExecutorService,我们有多个,因为每个文件夹都可以经历多个阶段。
Process1(使用ExecutorService1)→ Process2(ExecutorService2)→ Process3(ExecutorService3)
进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。我们实际上有 12 个进程,但任何特定文件夹都不太可能经历所有 12 个进程
但我意识到这是有缺陷的,因为在 16 CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程在运行,这只会导致过多的争用。
所以我要做的是让所有进程(Process1、Process2...)使用相同的 ExecutorService,这样我们就只有与 CPU 匹配的工作线程。
但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,然后关闭才会完成() 在 Process0 上不会成功,直到所有内容都发送到 Process1 等等。
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
但是,如果一切都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再有效,因为在调用 shutdown() 时,并不是 Process1 会提交给 Process2 的所有文件夹,因此它会过早关闭。
那么当该 ES 上的任务可以提交给同一 ES 上的其他任务时,我如何使用单个 ExecutorService 检测所有工作何时完成?
或者有更好的方法吗?
注意,你可能会想他为什么不直接把Process1,2 & 3的逻辑合并成一个Process。困难在于,虽然我最初是按文件夹对歌曲进行分组,但有时歌曲会被分成更小的组,并且它们会被分配到生产线上的单独进程,而且不一定是同一进程,实际上总共有 12 个进程。
基于 Sholms 想法的尝试
主线程
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
使用 MainAnalyserService 中的此函数提交新任务的流程代码
public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
看起来好像可以,但失败了
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
现在我意识到我不能让一个线程调用 future.get()(等待完成),同时其他线程正在添加到列表中。
最佳答案
我同意 Shloim 的观点,您在这里不需要多个 ExecutorService
实例——只需一个(根据您可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。实际上,我认为您可能不需要 ExecutorService
;如果您使用信号完整性的外部机制,一个简单的 Executor
就可以完成这项工作。
我会先构建一个类来表示整个较大的工作项。如果您需要使用每个子工作项的结果,您可以使用队列,但如果您只想知道是否还有工作要做,您只需要一个计数器。
例如,您可以这样做:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private int pendingItems; // guarded by monitor lock on this instance
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}
public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}
public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}
public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}
public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;
public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}
@Override
public void run() {
try {
// do some work with the file
if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}
如果您担心 pendingItems
计数器的同步开销,您可以改用 AtomicInteger
。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch
。这是一个示例实现:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}
public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}
public boolean hasPendingWork() {
return pendingItems.get() > 0;
}
public void awaitCompletion() {
latch.await();
}
}
你可以这样调用它:
Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();
此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems
计数器来跟踪剩余工作量做。
关于java - 如果 ES 上的项目可以重新提交给 ES,我怎么知道 ExecutorService 何时完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56617083/
从 Redis 获取消息时,onDone:(){print('done')} 从未起作用。 import 'package:dartis/dartis.dart' as redis show PubS
昨天我玩了一些vim脚本,并设法通过循环来对当前输入的内容进行状态栏预测(请参见屏幕截图(灰色+黄色栏))。 问题是,我不记得我是怎么得到的,也找不到我用于该vim魔术的代码片段(我记得它很简单):它
我尝试加载 bash_completion在我的 bash (3.2.25) 中,它不起作用。没有消息等。我在我的 .bashrc 中使用了以下内容 if [ -f ~/.bash_completio
我正在尝试构建一个 bash 完成例程,它将建议命令行标志和合适的标志值。例如在下面 fstcompose 命令我想比赛套路先建议 compose_filter= 标志,然后建议来自 [alt_seq
当我尝试在重定向符号后完成路径时,bash 完成的行为就好像它仍在尝试在重定向之前完成命令的参数一样。 例如: dpkg -l > /med标签 通过在 /med 之后点击 Tab我希望它完成通往 /
我的类中有几个 CAKeyframeAnimation 对象。 他们都以 self 为代表。 在我的animationDidStop函数中,我如何知道调用来自哪里? 是否有任何变量可以传递给 CAKe
我有一个带有 NSDateFormatter 的 NSTextField。格式化程序接受“mm/dd/yy”。 可以自动补全日期吗?因此,用户可以输入“mm”,格式化程序将完成当前月份和年份。 最佳答
有一个解决方案可以使用以下方法完成 NSTextField : - (NSArray *)control:(NSControl *)control textView:(NSTextView *)tex
我正在阅读 Passport 的文档,我注意到 serialize()和 deserialize() done()被调用而不被返回。 但是,当使用 passport.use() 设置新策略时在回调函数
在 ubuntu 11.10 上的 Firefox 8.0 中,尽管 img.complete 为 false,但仍会调用 onload 函数 draw。我设法用 setTimeout hack 解决
假设我有两个与两个并行执行的计算相对应的 future 。我如何等到第一个 future 准备好?理想情况下,我正在寻找类似于Python asyncio's wait且参数为return_when=
我正在寻找一种 Java 7 数据结构,其行为类似于 java.util.Queue,并且还具有“最终项目已被删除”的概念。 例如,应可以表达如下概念: while(!endingQueue.isFi
这是一个简单的问题。 if ($('.dataTablePageList')) { 我想做的是执行一个 if 语句,该语句表示如果具有 dataTablesPageList 类的对象也具有 menu
我用replaceWith批量替换了许多div中的html。替换后,我使用 jTruncate 来截断文本。然而它不起作用,因为在执行时,replaceWith 还没有完成。 我尝试了回调技巧 ( H
有没有办法调用 javascript 表单 submit() 函数或 JQuery $.submit() 函数并确保它完成提交过程?具体来说,在一个表单中,我试图在一个 IFrame 中提交一个表单。
我有以下方法: function animatePortfolio(fadeElement) { fadeElement.children('article').each(function(i
我刚刚开始使用 AndEngine, 我正在像这样移动 Sprite : if(pValueY < 0 && !jumping) { jumping =
我正在使用 asynctask 来执行冗长的操作,例如数据库读取。我想开始一个新 Activity 并在所有异步任务完成后呈现其内容。实现这一目标的最佳方法是什么? 我知道 onPostExecute
我有一个脚本需要命令名称和该命令的参数作为参数。 所以我想编写一个完成函数来完成命令的名称并完成该命令的参数。 所以我可以这样完成命令的名称 if [[ "$COMP_CWORD" == 1 ]];
我的应用程序有一个相当奇怪的行为。我在 BOOT_COMPLETE 之后启动我的应用程序,因此在我启动设备后它是可见的。 GUI 响应迅速,一切正常,直到我调用 finish(),按下按钮时,什么都没
我是一名优秀的程序员,十分优秀!