gpt4 book ai didi

Java ThreadPoolExecutor 策略, 'Direct Handoff' 带队列?

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:18:41 26 4
gpt4 key购买 nike

我希望有一个 ThreadPoolExecutor,我可以在其中设置一个 corePoolSize 和一个 maximumPoolSize,然后队列将切换任务立即进入线程池,从而创建新线程,直到达到 maximumPoolSize,然后开始添加到队列中。

有这样的事吗?如果没有,它没有这样的策略有什么充分的理由吗?

我本质上想要的是提交任务执行,当它达到一个点,它基本上会因为有太多线程(通过设置 maximumPoolSize)而获得“最差”性能时,它将停止添加新线程并且使用该线程池并开始排队,然后如果队列已满则拒绝。

当负载回落时,它可以开始将未使用的线程拆除回 corePoolSize。

在我的申请中,这比 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html 中列出的“三个通用策略”更有意义

最佳答案

注意:这些实现有些缺陷且不确定。请在使用此代码之前阅读整个答案和评论。

如何创建一个工作队列,在执行程序低于最大池大小时拒绝项目,并在达到最大值后开始接受它们?

这依赖于记录的行为:

“如果一个请求不能被排队,一个新的线程被创建除非这个将超过 maximumPoolSize,在这种情况下,任务将是被拒绝了。”

public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;

public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();

final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);

workQueue.setExecutor(executor);

for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}

System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}

public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;

public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}

public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}

注意:您的问题让我感到惊讶,因为我希望您期望的行为是配置有 corePoolSize < maximumPoolSize 的 ThreadPoolExecutor 的默认行为。但是正如您指出的那样,ThreadPoolExecutor 的 JavaDoc 明确指出了其他情况。


想法#2

我认为我有一个可能稍微好一点的方法。它依赖于编码到 ThreadPoolExecutor 中的 setCorePoolSize 方法中的副作用行为。这个想法是在工作项排队时临时有条件地增加核心池大小。当增加核心池大小时,ThreadPoolExecutor 将立即产生足够的新线程来执行所有排队的 (queue.size()) 任务。然后我们立即减小核心池大小,这允许线程池在未来的低 Activity 期间自然缩小。这种方法仍然不是完全确定的(例如,池大小有可能增长到超过最大池大小),但我认为在几乎所有情况下它都比第一种策略更好。

具体来说,我认为这种方法比第一种方法更好,因为:

  1. 它会更频繁地重用线程
  2. 不会因为赛跑而拒绝执行
  3. 我想再次提及,第一种方法会导致线程池增长到其最大大小,即使在非常轻的使用情况下也是如此。这种方法在这方面应该更有效。

-

public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;

public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);

final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);

workQueue.setExecutor(executor);

for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}

System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}

executor.shutdown();

executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}

public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;

public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}

public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}

public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}

关于Java ThreadPoolExecutor 策略, 'Direct Handoff' 带队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9622599/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com