- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我想使用 CompletionService 来处理来自一系列线程的结果当它们完成时。我有一个循环中的服务来获取它提供的 Future 对象,但我不知道确定所有线程何时完成(并因此退出循环)的最佳方法:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class Bar {
final static int MAX_THREADS = 4;
final static int TOTAL_THREADS = 20;
public static void main(String[] args) throws Exception{
final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);
for (int i=0; i<TOTAL_THREADS; i++){
service.submit(new MyCallable(i));
}
int finished = 0;
Future<Integer> future = null;
do{
future = service.take();
int result = future.get();
System.out.println(" took: " + result);
finished++;
}while(finished < TOTAL_THREADS);
System.out.println("Shutting down");
threadPool.shutdown();
}
public static class MyCallable implements Callable<Integer>{
final int id;
public MyCallable(int id){
this.id = id;
System.out.println("Submitting: " + id);
}
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
System.out.println("finished: " + id);
return id;
}
}
}
我已尝试检查 ThreadPoolExecutor 的状态,但我知道 getCompletedTaskCount 和 getTaskCount 方法只是近似值,不应依赖。有没有比我自己计算更好的方法来确保我从 CompletionService 中检索到所有 Futures?
编辑:Nobeh 提供的链接和 this link建议计算提交的任务数,然后多次调用 take() 是可行的方法。我很惊讶没有办法询问 CompletionService 或其 Executor 还剩下什么要返回。
最佳答案
参见 http://www.javaspecialists.eu/archive/Issue214.html有关如何扩展 ExecutorCompletionService 以执行您正在寻找的内容的体面建议。为了您的方便,我在下面粘贴了相关代码。作者还建议让服务实现 Iterable,我认为这是个好主意。
FWIW,我同意你的看法,这确实应该成为标准实现的一部分,但遗憾的是,事实并非如此。
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
private final AtomicLong submittedTasks = new AtomicLong();
private final AtomicLong completedTasks = new AtomicLong();
public CountingCompletionService(Executor executor) {
super(executor);
}
public CountingCompletionService(
Executor executor, BlockingQueue<Future<V>> queue) {
super(executor, queue);
}
public Future<V> submit(Callable<V> task) {
Future<V> future = super.submit(task);
submittedTasks.incrementAndGet();
return future;
}
public Future<V> submit(Runnable task, V result) {
Future<V> future = super.submit(task, result);
submittedTasks.incrementAndGet();
return future;
}
public Future<V> take() throws InterruptedException {
Future<V> future = super.take();
completedTasks.incrementAndGet();
return future;
}
public Future<V> poll() {
Future<V> future = super.poll();
if (future != null) completedTasks.incrementAndGet();
return future;
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
Future<V> future = super.poll(timeout, unit);
if (future != null) completedTasks.incrementAndGet();
return future;
}
public long getNumberOfCompletedTasks() {
return completedTasks.get();
}
public long getNumberOfSubmittedTasks() {
return submittedTasks.get();
}
public boolean hasUncompletedTasks() {
return completedTasks.get() < submittedTasks.get();
}
}
关于java - 如何知道 CompletionService 何时完成交付结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9987019/
我需要配置重试策略以通过 ExecutorCompletionService 调用 API。 示例代码: public void func() throws Exception{ Execut
我尝试在 Java 上制作多线程程序,打印返回已完成线程的结果。问题是,当我运行此代码时,它只是卡在队列中的第二个值上: System.out.println("[!] Creaing
我有这样的东西: ExecutorService executor = Executors.newFixedThreadPool(2); CompletionService completionSer
所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue
我需要做这样的事情: ExecutorService executor = Executors.newFixedThreadPool(2); CompletionService completionS
java.util.concurrent.CompletionService 是对 ExecutorService 的一个功能增强封装,优化了获取异步操作结果的接口。 假设我们要向线程池提交一批任务,
如果'使用 System.Linq;'存在并且 System.Linq 是引用的程序集之一,我希望 int[] 数组上的 Completions 返回 LINQ 扩展方法,例如Select<>(...
我对 Java 并发包非常陌生。我使用 java.util.concurrent.CompletionService 类从池中获取结果,而无需等待其他线程完成任务。但即使执行了 10 分钟的代码,我也
我正在研究 CompletionService 类,我发现提交队列与完成队列的解耦非常有用。 但我也想念一种方法来轮询/接受已取消的任务(这在某种程度上可以被视为已完成)。可以通过某种方式轻松完成吗?
我需要在多核机器上处理大量(>1亿)请求(每个请求是处理一个数据文件中的一行,涉及到一些与远程系统的I/O。虽然细节做没关系,具体任务是从一些数据文件中加载一个分布式的 Hazelcast map )
我有一个在 Tomcat 容器内运行的简单 Web 服务,它本质上是多线程的。在进入服务的每个请求中,我想对外部服务进行并发调用。 java.util.concurrent 中的 ExecutorCo
我有一个由多个线程访问的类,每个线程请求该类的一个方法。每个方法依次执行多个 Callables。该类使用ExecutorService中的threadPool通过invokeAll((Collect
我使用包裹在 2 线程 FixedThreadPool ExecutorService 周围的 CompletionService 提交了一些 Future 任务,然后我设置了一个等于提交任务数量的循
我想使用 CompletionService 来处理来自一系列线程的结果当它们完成时。我有一个循环中的服务来获取它提供的 Future 对象,但我不知道确定所有线程何时完成(并因此退出循环)的最佳方法
AbstractExecutorService public abstract class AbstractExecutorService implements ExecutorService
我刚刚在 this blog post 中找到了 CompletionService .但是,这并没有真正展示 CompletionService 相对于标准 ExecutorService 的优势。
我正在尝试提交多个任务并在可用时获取结果。但是,循环结束后,我必须强制所有任务在指定的时间内完成。如果没有,则抛出错误。最初,我所拥有的只是 executorService 的 invokeAll、s
可以安全地假设 java.util.concurrent.CompletionService.take().isDone() 总是返回true?如果是这样,为什么 take() 返回一个 Future
有很多关于如何等待所有“worker”作业完成的示例,但是如何在每个 worker 完成时以与 Java 的 CompletionService 相同的方式使用react呢? 我已经破解了一些有用的东
我们需要在一个线程中完成一些工作,该线程将在使用 Spring Boot 的 Web 应用程序的服务器上运行。 我们想要发生的是:1) 工作作为我们自定义的 Runnable 对象提交2)轮到工作时就
我是一名优秀的程序员,十分优秀!