gpt4 book ai didi

java - 正确实现生产者-消费者场景以及 "graceful"线程池终止

转载 作者:行者123 更新时间:2023-12-01 15:54:38 25 4
gpt4 key购买 nike

我正在开发我的第一个多线程项目,因此有一些我不确定的事情。有关我的设置的详细信息位于 previous question简而言之:我有一个由 Executors.newFixedThreadPool(N) 实现的线程池。一个线程被赋予一个操作,该操作对本地和远程资源进行一系列查询并迭代填充 ArrayBlockingQueue,而其余线程则调用该操作上的 take() 方法。排队并处理队列中的对象。

尽管小型和受监督的测试似乎运行正常,但我不确定如何处理特殊场景,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终中断异常。我在这里读了一些关于 SO 的文章,然后我读到了 Goetz 写的两篇非常好的文章。和 Kabutz 。人们似乎一致认为不应忽视这些异常(exception)情况。但是我不确定提供的示例与我的情况有何关系,我没有在代码中的任何地方调用 thread.interrupt()...说到这一点,我不确定我是否应该这样做所以...

总而言之,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrrruptedExceptions?希望这些问题有意义,否则我会尽力进一步描述。

提前致谢,

<小时/>

编辑:我已经致力于实现一段时间了,我遇到了一个新的问题,所以我想我应该更新一下情况。我不幸遇到了 ConcurrentModificationException,这很可能是由于线程池的不完全关闭/终止造成的。当我发现我可以使用 isTermminate() 时,我就尝试了,然后由于不同步 wait(),我得到了 IllegalMonitorStateException 。代码的当前状态如下:

我遵循了@Jonathan 的回答中的一些建议,但是我认为他的建议并不像我需要/想要的那样。背景故事和我上面提到的一样,相关代码如下:

类持有/管理池,并提交可运行对象:

public void serve() {
try {
this.started = true;
pool.execute(new QueryingAction(pcqs));
for(;;){
PathwayImpl p = bq.take();

if (p.getId().equals("0")){
System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
pool.shutdown();
// give 3 minutes per item in queue to finish up
pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
break;
}
int sortMethod = AnalysisParameters.getInstance().getSort_method();
pool.submit(new AnalysisAction(p));
}
} catch (Exception ex) {
ex.printStackTrace();
System.err.println("Unexpected error in core analysis, terminating execution!");
System.exit(0);
}finally{ pool.shutdown(); }
}

public boolean isDone(){
if(this.started)
return pool.isTerminated();
else
return false;
}
<小时/>

通过位于单独类中的以下代码将元素添加到队列:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

...offer() 而不是 take() 背后的动机正如 Jonathan 提到的。不可预见的 block 很烦人并且很难弄清楚,因为我的分析需要很长时间。所以我需要相对快速地知道失败是否是由于坏 block ,或者是否只是处理数字......

<小时/>

最后;这是我的测试类中的代码,我在其中检查“并发服务”(此处名为 cs)与其余要分析的对象之间的交互:

cs.serve();
synchronized (this) {
while(!cs.isDone())
this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

我意识到这是一个很长的问题,但我试图详细而具体。希望这不会太拖累,如果是的话,我深表歉意......

最佳答案

不要使用会阻塞的 take,而是使用类似这样的东西:

PathwayImpl p = null;
synchronized (bq) {
try {
while (bq.isEmpty() && !stopSignal) {
bq.wait(3000); // Wait up to 3 seconds and check again
}

if (!stopSignal) {
p = bq.poll();
}
}
catch (InterruptedException ie) {
// Broke us out of waiting, loop around to test the stopSignal again
}
}

这假设该 block 包含在某种 while (!stopSignal) {...} 中。

然后,在添加到队列的代码中,执行以下操作:

synchronized (bq) {
bq.add(item);
bq.notify();
}

对于InterruptedException,它们适合向线程发出信号以立即测试停止信号,而不是等到下一次超时和测试。我建议再次测试您的停止信号,并可能记录异常。

我在发出 panic 信号时使用它们,而不是正常关闭,但这种情况很少有必要。

关于java - 正确实现生产者-消费者场景以及 "graceful"线程池终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5326013/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com