- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
项目开发中,为了统一管理线程,并有效精准地进行排错,我们经常要求项目人员统一使用线程池去创建线程。因为我们是在受不了有些人动不动就去创建一个线程,使用的多了以后,一旦报错就只有一个线程报错信息,还是线程的共用信息,再加上如果你将异常吃了(捕获后不做处理)的情况下,这个错误。。。。em,我实在不知道去哪里排查,不然你换个人试试吧.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
除此之外还有一个重要的参数:
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程数超时退出。
该参数有在特定的业务场景下有很大的意义。比如:你的业务只在晚上需要执行,其余时间无需执行。那么为何不把资源让出来,白天的时候,可以让其他业务占有这些资源去执行呢.
由该类图可知,Executor执行器定义执行方法,ExecutorService定义线程池操作的基本方法,AbstractExecutorService定义了线程池操作的方法模板.
ThreadPoolExecutor任务执行流程图 。
基本的参数校验与赋值,简单代码不过多赘述.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
////基本的参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);//将线程对象封装成RunnableFuture
execute(ftask);//任务执行
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);//将线程对象封装成RunnableFuture
execute(ftask);//任务执行
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);//将线程对象封装成RunnableFuture
execute(ftask);//任务执行
return ftask;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取当前的线程池状态。单个参数,保存了线程池的状态以及线程数量
if (workerCountOf(c) < corePoolSize) { //当线程数量小于核心线程数
if (addWorker(command, true)) //直接添加任务,运行线程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//如果核心线程数已经满了,那么直接添加到阻塞队列。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//线程池不是running状态,执行拒绝策略。
reject(command);
else if (workerCountOf(recheck) == 0)//线程池线程数量不能为0,需要有一个线程对线程池的后续操作进行处理,比如关闭线程池
addWorker(null, false);
}
else if (!addWorker(command, false))//当核心线程与阻塞队列都满了的时候,直接添加任务到非核心线程运行。添加失败直接执行拒绝策略
reject(command);
}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //运行状态 正常执行任务
private static final int SHUTDOWN = 0 << COUNT_BITS; //关闭线程池,不再接收新任务
private static final int STOP = 1 << COUNT_BITS; //关闭线程池,所有任务停止
private static final int TIDYING = 2 << COUNT_BITS; //中间状态
private static final int TERMINATED = 3 << COUNT_BITS; //线程池已经关闭
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取ctl的快照保存在栈上
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //如果线程池已经关闭,或者(当前线程池关闭状态当前任务是空且当前工作队列不为空)不满足的情况下直接返回
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//CAS修改线程池ctl变量,增加线程数
break retry; //添加成功直接退出
c = ctl.get(); // 添加不成功,为了保证多线程运行的安全性,重新获取
if (runStateOf(c) != rs)//当前线程池状态发生改变
continue retry; //直接重新运行retry循环体
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //生成自定义的线程woker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//这个代码没有意义,mainLock定义的变量为final。可以直接使用
mainLock.lock();//添加work使用锁,保证添加任务的原子性。
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || //线程池处于running状态
(rs == SHUTDOWN && firstTask == null)) {//线程池处于showdown状态但是firstTask为空。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//保存当前线程池中线程的最大数量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//添加成功,运行线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)//线程启动失败
addWorkerFailed(w);//移除work,减少线程数量
}
return workerStarted;
}
t.start()执行线程任务 。
//Worker类中实际执行任务的方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts //将原始的线程状态为-1修改为0,后续通过getState()>=0获取线程是否已经运行的状态,允许线程中断。-1默认为初始化,此处需要进行处理
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//task不等于空直接运行,task等于空从workerQueue阻塞队列获取任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//线程池运行状态大于等于STOP
(Thread.interrupted() && //线程是否已经被中断了
runStateAtLeast(ctl.get(), STOP))) &&//鲜橙汁运行状态大于等于STOP
!wt.isInterrupted())//判断任务的线程如果没有被中断
wt.interrupt();//中断当前任务线程
try {
beforeExecute(wt, task);//钩子函数,实际任务运行之前做处理
Throwable thrown = null;
try {
task.run();//执行实际任务代码
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//钩子函数,实际任务运行之后做处理
}
} finally {
task = null;//将任务置空
w.completedTasks++;//任务完成数加1
w.unlock();
}
}
completedAbruptly = false;//执行过程中是否发成异常
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//执行任务退出操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果有异常中断导致任务结束
decrementWorkerCount();//将线程数量减1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;//完成的任务数量累加
workers.remove(w);//从workers的任务集合中移除当前任务
} finally {
mainLock.unlock();
}
tryTerminate();//尝试关闭线程池
int c = ctl.get();//获取当前线程池的最新状态
if (runStateLessThan(c, STOP)) {//如果当前任务状态小于STOP
if (!completedAbruptly) {//当前任务执行无异常发生
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//根据allowCoreThreadTimeOut参数获取最小的线程数量
if (min == 0 && ! workQueue.isEmpty())//如果核心线程允许退出,并且工作队列不为空
min = 1;//设置最小值为1,因为最后需要有线程去执行线程池的后续处理,所有线程都没了,后续线程池退出无线程处理
if (workerCountOf(c) >= min)//如果工作的线程数量大于等最小值
return; // replacement not needed 直接返回
}
addWorker(null, false);//如果当前线程数已经小于最小线程数,那么需要保证最小线程数在运行,所以需要有保证线程池的正常运行,添加一个空任务。
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();//获取当前线程池状态
int rs = runStateOf(c);//获取当前运行状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//如果线程池状态大于等于SHUTDOWN并且(线程数量大于等于STOP或者工作队列为空)
decrementWorkerCount();//将线程池中线程数量减1
return null;
}
int wc = workerCountOf(c);//获取当前线程池的线程数量
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//判断是否运行核心线程数超时,判断是否需要超时机制
if ((wc > maximumPoolSize || (timed && timedOut))//工作线程大于最大线程池数量或者允许超时并且有超时的情况
&& (wc > 1 || workQueue.isEmpty())) {//并且线程池线程数量大于1或者阻塞队列为空
if (compareAndDecrementWorkerCount(c))//CAS操作将线程池数量减1
return null;//返回空
continue;//CAS失败继续
}
try {
Runnable r = timed ?//允许超时从队列中拿任务并等待keepAliveTime时间
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();阻塞等待
if (r != null)//获取的任务不为空
return r;//直接返回
timedOut = true;//如果为空,超时标志位为true
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
if (w != null)//work不是空
workers.remove(w);//直接从workers中移除当前任务
decrementWorkerCount();//加个ctl中的woker数量减少
tryTerminate();//如果线程池已经是showdown状态,尝试让线程池停止。多线程协作的函数
} finally {
mainLock.unlock();
}
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查关闭权限,可以忽略
advanceRunState(SHUTDOWN);//线程池状态递进,由running变为shutdown
interruptIdleWorkers();//中断所有空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor钩子函数,调度线程池使用
} finally {
mainLock.unlock();
}
tryTerminate();//尝试将线程池关闭。
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();//获取当前的线程状态
if (runStateAtLeast(c, targetState) ||//当前状态已经是大于等于shutdown直接退出
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//cas操作将线程状态改为targetState。
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
for (Worker w : workers) {//遍历works中所有的工作任务
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//如果没有被中断过,并且可以获得锁,证明属于空闲线程
try {
t.interrupt();//将线程中断,打上中断标志位
} catch (SecurityException ignore) {
} finally {
w.unlock();//解锁
}
}
if (onlyOne)//只中断一个线程标识
break;
}
} finally {
mainLock.unlock();
}
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//权限检查
advanceRunState(STOP);//状态递进 详细方法见上面
interruptWorkers();//中断所有启动的work线程
tasks = drainQueue();//将所有未执行的任务出队保存
} finally {
mainLock.unlock();
}
tryTerminate();//尝试关闭线程池
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
for (Worker w : workers)//遍历所有woker进行处理
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//当前work的状态大于0并且线程不为空且线程未被中断
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
使用getState() >= 0表示当前线程已经启动,runWorker方法中会将其状态从-1改变。证明线程已经启动
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//标准的入队和出队功能不做过多注释
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();//获取当前线程状态ctl
if (isRunning(c) ||//线程池正在运行
runStateAtLeast(c, TIDYING) ||//线程池状态大于等于TIDYING,有其他线程已经改变线程池状态为TIDYING或者TERMINATED了
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//线程池状态等于shutdown并且工作队列不为空。
return;//以上三种情况线程池无法关闭,需要继续处理
if (workerCountOf(c) != 0) { // Eligible to terminate//当前工作线程数量不等于0
interruptIdleWorkers(ONLY_ONE);//中断线程且只中断一个
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//cas操作将线程池状态置为TIDYING
try {
terminated();//线程池终止
} finally {
ctl.set(ctlOf(TERMINATED, 0));//设置线程池状态为TERMINATED
termination.signalAll();//信号唤醒所有等待线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
线程池的运用在项目中已经成为一种常态,作为一个开发人员最重要的了解其背后的设计原理以及流程,更好地运用线程池,方便提升项目程序的性能以及排查错误。在阅读对应的线程池源码时,我们只局限于单线程的思维,更多的是要去考虑当多线程并发执行时的临界条件。了解设计者的设计初衷、以及设计意图,能让你更好地在项目中运用并设计符合自己项目的线程池。以上是我个人对于线程池ThreadPoolExecutor的理解,不足之处,请多多指教.
最后此篇关于一文带你了解线程池原理的文章就讲到这里了,如果你想了解更多关于一文带你了解线程池原理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
本文全面深入地探讨了Docker容器通信技术,从基础概念、网络模型、核心组件到实战应用。详细介绍了不同网络模式及其实现,提供了容器通信的技术细节和实用案例,旨在为专业从业者提供深入的技术洞见和实
📒博客首页:崇尚学技术的科班人 🍣今天给大家带来的文章是《Dubbo快速上手 -- 带你了解Dubbo使用、原理》🍣 🍣希望各位小伙伴们能够耐心的读完这篇文章🍣 🙏博主也在学习阶段,如若发
一、写在前面 我们经常使用npm install ,但是你是否思考过它内部的原理是什么? 1、执行npm install 它背后帮助我们完成了什么操作? 2、我们会发现还有一个成为package-lo
Base64 Base64 是什么?是将字节流转换成可打印字符、将可打印字符转换为字节流的一种算法。Base64 使用 64 个可打印字符来表示转换后的数据。 准确的来说,Base64 不算
目录 协程定义 生成器和yield语义 Future类 IOLoop类 coroutine函数装饰器 总结 tornado中的
切片,这是一个在go语言中引入的新的理念。它有一些特征如下: 对数组抽象 数组长度不固定 可追加元素 切片容量可增大 容量大小成片增加 我们先把上面的理念整理在这
文章来源:https://sourl.cn/HpZHvy 引 言 本文主要论述的是“RPC 实现原理”,那么首先明确一个问题什么是 RPC 呢?RPC 是 Remote Procedure Call
源码地址(包含所有与springmvc相关的,静态文件路径设置,request请求入参接受,返回值处理converter设置等等): spring-framework/WebMvcConfigurat
请通过简单的java类向我展示一个依赖注入(inject)原理的小例子虽然我已经了解了spring,但是如果我需要用简单的java类术语来解释它,那么你能通过一个简单的例子向我展示一下吗?提前致谢。
1、背景 我们平常使用手机和电脑上网,需要访问公网上的网络资源,如逛淘宝和刷视频,那么手机和电脑是怎么知道去哪里去拿到这个网络资源来下载到本地的呢? 就比如我去食堂拿吃的,我需要
大家好,我是飞哥! 现在 iptables 这个工具的应用似乎是越来越广了。不仅仅是在传统的防火墙、NAT 等功能出现,在今天流行的的 Docker、Kubernets、Istio 项目中也经
本篇涉及到的所有接口在公开文档中均无,需要下载 GitHub 上的源码,自己创建私有类的文档。 npm run generateDocumentation -- --private yarn gene
我最近在很多代码中注意到人们将硬编码的配置(如端口号等)值放在类/方法的深处,使其难以找到,也无法配置。 这是否违反了 SOLID 原则?如果不是,我是否可以向我的团队成员引用另一个“原则”来说明为什
我是 C#、WPF 和 MVVM 模式的新手。很抱歉这篇很长的帖子,我试图设定我所有的理解点(或不理解点)。 在研究了很多关于 WPF 提供的命令机制和 MVVM 模式的文本之后,我在弄清楚如何使用这
可比较的 jQuery 函数 $.post("/example/handler", {foo: 1, bar: 2}); 将创建一个带有 post 参数 foo=1&bar=2 的请求。鉴于 $htt
如果Django不使用“延迟查询执行”原则,主要问题是什么? q = Entry.objects.filter(headline__startswith="What") q = q.filter(
我今天发现.NET框架在做计算时遵循BODMAS操作顺序。即计算按以下顺序进行: 括号 订单 部门 乘法 添加 减法 但是我四处搜索并找不到任何文档确认 .NET 绝对 遵循此原则,是否有此类文档?如
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
API 回顾 在创建 Viewer 时可以直接指定 影像供给器(ImageryProvider),官方提供了一个非常简单的例子,即离屏例子(搜 offline): new Cesium.Viewer(
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
我是一名优秀的程序员,十分优秀!