gpt4 book ai didi

Java正确使用ExecutorService、CompletionService、BlockingQueue和Observer?

转载 作者:行者123 更新时间:2023-11-30 04:59:40 25 4
gpt4 key购买 nike

所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue 以及 Observer 来实现多线程的正确有效方法。我将在下面提供示例代码,但首先让我快速解释一下我认为它是如何工作的,也许这会有所帮助。

我拥有的第一件事是一个 BlockingQueue,所有任务都通过 add(Task task) 方法添加到该队列中。创建类后,将调用 run 方法,并使用 while(true) 调用 take 来阻塞队列,直到有内容添加到任务队列中。

一旦将某些内容添加到 run() 内的队列中,queue.take() 就会返回队列中的项目。然后我将这个项目传递给 WorkerThread 类来处理它。该workerThread被添加到CompletionService池中,该池处理等待线程完成的情况。

好的,现在是我不确定是否正确的部分。我还有一个实现 runnable 的内部类,并在类初始化时启动。它的工作是永远循环调用 pool.take()。因此,这本质上是等待其中一个 WorkerThread 完成。我让完成服务处理这个问题。一旦 take() 获取值,内部类就会将其传递给通知观察者方法。

这个实现可以吗?让我有点担心的是,主类在任务队列上运行 while(true) 循环,并且内部类也在池上循环等待从 WorkerThread 接收结果?

这是一个示例实现。你觉得怎么样?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}


public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}

}


public void add(VulnInfo info) {
queue.add(info);
}

@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{

public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}



}
}

}

最佳答案

根据我的经验,你的解决方案似乎没问题。我有三点意见/建议:

  1. 创建新的执行线程 responseWorkerThread = new Thread(schedulerWorker)responseWorkerThread.start() 后,您基本上就已经分解了这两个循环。这部分看起来还不错。您似乎确实正确使用了 Executor 的 API,但看起来您可能需要更多代码来停止 HttpScheduledWorker 线程并关闭 ExecutionCompletionService 作为 HttpSchedulerThreaded 类的一部分。
  2. 我不确定您是否真的有必要使用队列ExecutionCompletionService 已使用 BlockingQueue 来管理提交给它的任务。
  3. 您的“问题”可能更适合 beta Code Review网站。

关于Java正确使用ExecutorService、CompletionService、BlockingQueue和Observer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7286945/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com