gpt4 book ai didi

java - 如何知道 CompletionService 何时完成交付结果?

转载 作者:搜寻专家 更新时间:2023-10-31 19:48:32 25 4
gpt4 key购买 nike

我想使用 CompletionService 来处理来自一系列线程的结果当它们完成时。我有一个循环中的服务来获取它提供的 Future 对象,但我不知道确定所有线程何时完成(并因此退出循环)的最佳方法:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

final static int MAX_THREADS = 4;
final static int TOTAL_THREADS = 20;

public static void main(String[] args) throws Exception{

final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

for (int i=0; i<TOTAL_THREADS; i++){
service.submit(new MyCallable(i));
}

int finished = 0;
Future<Integer> future = null;
do{
future = service.take();
int result = future.get();
System.out.println(" took: " + result);
finished++;

}while(finished < TOTAL_THREADS);

System.out.println("Shutting down");
threadPool.shutdown();
}


public static class MyCallable implements Callable<Integer>{

final int id;

public MyCallable(int id){
this.id = id;
System.out.println("Submitting: " + id);
}

@Override
public Integer call() throws Exception {
Thread.sleep(1000);
System.out.println("finished: " + id);
return id;
}
}
}

我已尝试检查 ThreadPoolExecutor 的状态,但我知道 getCompletedTaskCount 和 getTaskCount 方法只是近似值,不应依赖。有没有比我自己计算更好的方法来确保我从 CompletionService 中检索到所有 Futures?


编辑:Nobeh 提供的链接和 this link建议计算提交的任务数,然后多次调用 take() 是可行的方法。我很惊讶没有办法询问 CompletionService 或其 Executor 还剩下什么要返回。

最佳答案

参见 http://www.javaspecialists.eu/archive/Issue214.html有关如何扩展 ExecutorCompletionService 以执行您正在寻找的内容的体面建议。为了您的方便,我在下面粘贴了相关代码。作者还建议让服务实现 Iterable,我认为这是个好主意。

FWIW,我同意你的看法,这确实应该成为标准实现的一部分,但遗憾的是,事实并非如此。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
private final AtomicLong submittedTasks = new AtomicLong();
private final AtomicLong completedTasks = new AtomicLong();

public CountingCompletionService(Executor executor) {
super(executor);
}

public CountingCompletionService(
Executor executor, BlockingQueue<Future<V>> queue) {
super(executor, queue);
}

public Future<V> submit(Callable<V> task) {
Future<V> future = super.submit(task);
submittedTasks.incrementAndGet();
return future;
}

public Future<V> submit(Runnable task, V result) {
Future<V> future = super.submit(task, result);
submittedTasks.incrementAndGet();
return future;
}

public Future<V> take() throws InterruptedException {
Future<V> future = super.take();
completedTasks.incrementAndGet();
return future;
}

public Future<V> poll() {
Future<V> future = super.poll();
if (future != null) completedTasks.incrementAndGet();
return future;
}

public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
Future<V> future = super.poll(timeout, unit);
if (future != null) completedTasks.incrementAndGet();
return future;
}

public long getNumberOfCompletedTasks() {
return completedTasks.get();
}

public long getNumberOfSubmittedTasks() {
return submittedTasks.get();
}

public boolean hasUncompletedTasks() {
return completedTasks.get() < submittedTasks.get();
}
}

关于java - 如何知道 CompletionService 何时完成交付结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9987019/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com