gpt4 book ai didi

java - 改变 ThreadPoolExecutor

转载 作者:行者123 更新时间:2023-11-30 09:15:52 34 4
gpt4 key购买 nike

如以下链接所述:-

How to get the ThreadPoolExecutor to increase threads to max before queueing?

我将队列实现更改为在进入元素后返回 false。因此,无论何时将新任务插入队列,都会为其创建一个新线程。

但是当我在放置记录器的情况下大规模运行以下实现(Bis 系统测试)时,会产生一个新问题。

当一个任务来执行时,它会被插入到队列中,当队列返回 false 时,会创建一个新线程来执行它。池中当前存在的空闲线程不会被拾取。原因是任务从 getTask() 方法分配给空闲线程,该方法从队列中挑选任务。所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配执行任务而不是创建新线程??

下面的输出会更清楚:-

Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

代码文件如下:-

ThreadPoolExecutor 的版本是 java 1.5,因为我们在服务器机器上使用的是 1.5,无法升级。

线程池执行器:-

 public void execute(Runnable command) {
System.out.println("Active Count: " + getActiveCount()
+ " Pool Size : " + getPoolSize() + " Idle Count: "
+ (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0) { // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}

LinkedBlockingQueue:-

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

/**
*
*/
private static final long serialVersionUID = 1L;

public CustomBlockingQueue() {
super(Integer.MAX_VALUE);
}

public boolean offer(E e) {
return false;
}

}

在拒绝处理程序中,我们正在调用我们尚未覆盖的队列的 put 方法

调用执行器

   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

我们正在调用核心池大小为 3 的执行程序,最大池大小为 8,并为任务使​​用无界链接阻塞队列。

最佳答案

使用 SynchronousQueue 实现“在排队前开始但更喜欢现有线程”行为的最简单方法.当且仅当已经有等待接收者时,它才会接受提供的元素。因此,空闲线程将获取项目,一旦没有空闲线程,ThreadPoolExecutor 将启动新线程。

唯一的缺点是,一旦所有线程都启动了,你不能简单地将待处理的项目放入队列中,因为它没有容量。因此,您要么必须接受提交者被阻塞,要么您需要另一个队列来将待处理的任务放入其中,而另一个后台线程会尝试将这些待处理的项目放入同步队列。这个额外的线程不会影响性能,因为它在大多数时间都被阻塞在这两个队列中的任何一个中。

class QueuingRejectionHandler implements RejectedExecutionHandler {

final ExecutorService processPending=Executors.newSingleThreadExecutor();

public void rejectedExecution(
final Runnable r, final ThreadPoolExecutor executor) {
processPending.execute(new Runnable() {
public void run() {
executor.execute(r);
}
});
}
}

ThreadPoolExecutor e=new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit,
new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());

关于java - 改变 ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19677308/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com