gpt4 book ai didi

动态扩展线程数的 Java ExecutorService

转载 作者:行者123 更新时间:2023-12-03 12:48:06 26 4
gpt4 key购买 nike

我有一个工作单元列表,我想并行处理它们。每个单元工作 8-15 秒,完全计算时间,没有 I/O 阻塞。我想要实现的是拥有一个 ExecutorService那:

  • 当没有工作要做时,实例化的线程为零
  • 如果需要,可以动态扩展至 20 个线程
  • 允许我一次添加所有工作单元(不阻止提交)

类似:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}

我没有成功的尝试是:

  • 使用 newCachedThreadPool()创建了太多线程
  • 然后我使用了它的内部调用 new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>()) ,但后来我注意到 submit() 由于同步队列而阻塞
  • 所以我使用了new LinkedBlockingQueue() ,只是为了发现 ThreadPoolExecutor 只产生一个线程

我确信有官方实现来处理这个非常基本的并发用例。有人可以建议吗?

最佳答案

创建 ThreadPoolExecutor 使用 LinkedBlockingQueue20作为 corePoolSize (构造函数中的第一个参数):

new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


如果您使用 LinkedBlockingQueue 没有预定义的容量 Pool :

  • Won't曾经检查过maxPoolSize .
  • 不会创建多于 corePoolSize 的线程的指定号码。

在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你把它设置为 0previous versions of Java (<1.6) 如果 corePoolSize 则不会创建任何设置为 0 (他们怎么敢?)

即使 corePoolSize 之后的版本也会创建一个新线程。是 0 ,这看起来像……一个修复……一个错误……改变……一个逻辑行为

Thread Pool Executor

Using an unbounded queue (for example a LinkedBlockingQueue without apredefined capacity) will cause new tasks to wait in the queue whenall corePoolSize threads are busy. Thus, no more than corePoolSizethreads will ever be created. (And the value of the maximumPoolSizetherefore doesn't have any effect.)


关于缩小

如果没有工作可做,为了实现删除所有线程,您必须关闭 coreThreads特别是(默认情况下它们不会终止)。为此,请设置 allowCoreThreadTimeOut(true) 在开始 Pool 之前.

注意设置正确的keep-alive timeout:例如,如果平均每 6 秒收到一个新任务,则将 keep-alive 时间设置为 5 秒可能导致不必要的 erase+create 操作( 哦亲爱的线程,你只需要等待一秒钟!)。根据任务接收收入速度设置此超时时间。

allowCoreThreadTimeOut

Sets the policy governing whether core threads may time out andterminate if no tasks arrive within the keep-alive time, beingreplaced if needed when new tasks arrive. When false, core threads arenever terminated due to lack of incoming tasks. When true, the samekeep-alive policy applying to non-core threads applies also to corethreads. To avoid continual thread replacement, the keep-alive timemust be greater than zero when setting true. This method should ingeneral be called before the pool is actively used.


TL/DR

  • 无界 LinkedBloquingQueue 作为任务队列。
  • corePoolSize替换 maxPoolSize含义
  • allowCoreThreadTimeOut(true) 以允许 Pool 使用基于超时的机制来缩小 的大小,该机制也会影响 coreThreads
  • keep-alive值设置为基于任务接收延迟的逻辑值

这种新鲜组合将导致 ExecutorService 99,99999% 的时间不会阻止提交者 (为此,排队的任务数应为 2.147.483.647 strong>),并且有效地在工作负载的基础上扩展线程数,在 { 0 <--> corePoolSize } 之间波动(双向) 并发线程。

作为建议,应该监控队列的大小,因为非阻塞行为是有代价的:获得 OOM 的概率如果它不受控制地继续增长,则异常(exception),直到 INTEGER.MAX_VALUE满足(例如:如果线程在提交者继续插入任务时死锁一整天)即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的链接包装器等...也是很多额外的负载。

关于动态扩展线程数的 Java ExecutorService,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65304792/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com