gpt4 book ai didi

java - 为什么 ThreadPoolExecutor 会在 keepAliveTime 之后将线程减少到 corePoolSize 以下?

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:21:52 25 4
gpt4 key购买 nike

我一直在研究使用 ThreadPoolExecutor 和 JDK6 进行线程池的不同策略。我有一个优先级队列在工作,但不确定我是否喜欢在 keepAliveTime 之后池没有调整大小的方式(无界队列得到的结果)。因此,我正在查看使用 LinkedBlockingQueue 和 CallerRuns 策略的 ThreadPoolExecutor。

我现在遇到的问题是池增加,正如文档所解释的那样,但是在任务完成并且 keepAliveTime 开始运行后,getPoolSize 显示池减少到零。下面的示例代码应该让您了解我的问题的基础:

public class ThreadPoolingDemo {
private final static Logger LOGGER =
Logger.getLogger(ThreadPoolingDemo.class.getName());

public static void main(String[] args) throws Exception {
LOGGER.info("MAIN THREAD:starting");
runCallerTestPlain();
}

private static void runCallerTestPlain() throws InterruptedException {
//10 core threads,
//50 max pool size,
//100 tasks in queue,
//at max pool and full queue - caller runs task
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.CallerRunsPolicy());

//dump 5000 tasks on the queue
for (int i = 0; i < 5000; i++) {
tpe.submit(new Runnable() {
@Override
public void run() {
//just to eat some time and give a little feedback
for (int j = 0; j < 20; j++) {
LOGGER.info("First-batch Task, looping:" + j + "["
+ Thread.currentThread().getId() + "]");
}
}
}, null);
}
LOGGER.info("MAIN THREAD:!!Done queueing!!");

//check tpe statistics forever
while (true) {
LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
+ tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
Thread.sleep(1000);
}
}
}

我发现了一个似乎是这个问题但已关闭的旧错误:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662 .这是否仍然存在于 1.6 中,还是我遗漏了什么?

看起来我 Rubber Ducked 这个 ( http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html )。我上面链接的错误与此相关:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792 ,问题似乎在 1.7 中得到解决(我加载了 1.7 并验证 - 已修复......)。我想我的主要问题是这个基本问题存在了将近十年的错误。我花了太多时间写这篇文章,现在没有发布,希望它能帮助别人。

最佳答案

... after the tasks complete and the keepAliveTime comes into play getPoolSize shows the pool getting reduced to zero.

所以这看起来是 ThreadPoolExecutor 中的竞争条件。我猜它正在按照设计工作,尽管不是预期的。在工作线程循环从阻塞队列中获取任务的 getTask() 方法中,您会看到以下代码:

if (state == SHUTDOWN)  // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}

如果 poolSize 增长到 corePoolSize 以上,那么如果轮询在 keepAliveTime 之后超时,代码将下降到 workerCanExit( ) 因为 rnull。所有线程都可以从该方法返回 true,因为它只是测试 poolSize 的状态:

    mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize)); << test poolSize here
} finally {
mainLock.unlock(); << race to workerDone() begins
}

一旦返回 true,工作线程就会退出,然后 poolSize 会递减。如果所有工作线程同时进行该测试,那么它们将全部退出,因为 poolSize 的测试与 --poolSize 时工作线程停止之间的竞争> 发生。

令我吃惊的是竞争条件的一致性。如果您在下面的 run() 中的 sleep() 中添加一些随机化,那么您可以让一些核心线程不退出,但我认为竞争条件会更难被击中。


您可以在以下测试中看到此行为:

@Test
public void test() throws Exception {
int before = Thread.activeCount();
int core = 10;
int max = 50;
int queueSize = 100;
ThreadPoolExecutor tpe =
new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
tpe.allowCoreThreadTimeOut(false);
assertEquals(0, tpe.getActiveCount());
// if we start 1 more than can go into core or queue, poolSize goes to 0
int startN = core + queueSize + 1;
// if we only start jobs the core can take care of, then it won't go to 0
// int startN = core + queueSize;
for (int i = 0; i < startN; i++) {
tpe.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while (true) {
System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
+ ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
Thread.sleep(1000);
}
}

如果您将 run() 方法中的 sleep 行更改为如下所示:

private final Random random = new Random();
...
Thread.sleep(100 + random.nextInt(100));

这将使竞争条件更难命中,因此一些核心线程仍将存在。

关于java - 为什么 ThreadPoolExecutor 会在 keepAliveTime 之后将线程减少到 corePoolSize 以下?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9912372/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com