- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
本文将深入分析Java线程池的源码,包括线程池的创建、任务提交、工作线程的执行和线程池的关闭等过程。通过对线程池源码的解析,我们能够更好地理解线程池的原理和机制,为我们在实际开发中合理使用线程池提供指导。 文章内容较长,建议找个安静的环境慢慢细读,由于线程池涉及的内容比较多,需要至少熟悉以下知识点(可直接点击跳转阅读):
在传统的多线程编程中,每次需要执行任务时都会创建一个新的线程,任务执行完毕后再销毁该线程。这种方式存在一些问题,例如频繁创建和销毁线程会带来较大的开销,线程数量的不可控会导致系统资源的浪费和性能下降,所以可以采用池化技术来避免这样的性能开销。 线程池的概念包括以下几个要点:
线程池的作用是优化多线程应用程序的性能和可控性,它通过预先创建一组线程,并将任务提交给线程池来执行,避免了频繁创建和销毁线程的开销。线程池会根据任务的数量和系统的负载情况来动态调整线程的数量,以提高系统资源的利用率和任务的响应速度。 通过线程池,我们可以更好地管理线程的生命周期、提高系统的资源利用率,并且能够更好地控制任务的执行顺序和优先级(主要是看使用的是哪个阻塞队列实现)。因此,在开发多线程应用程序时,合理使用线程池是一种推荐的编程模式.
线程池可以复用线程,避免了频繁创建和销毁线程的开销,从而提高了系统的性能。比较适合有大量短期的任务场景.
线程池可以根据系统的负载情况动态调整线程的数量,以适应不同的负载需求,提高了系统资源的利用率.
线程池可以控制并发执行的线程数量,避免了过多的线程竞争资源导致的性能下降和系统崩溃。适用于需要限制系统并发度的场景.
线程池可以将任务提交到任务队列中,根据需要进行排队和调度,从而提高了任务的响应速度。适合一些关心处理速度而结果可稍后获取的场景,比如异步处理、定时任务等.
使用线程池可以将任务的执行和线程的管理分离,简化了多线程编程的复杂性,开发者可集中精力到业务逻辑开发中,任务的执行和管理交由线程池统一协调.
在Web服务器中,线程池可以用来处理客户端的请求。当有新的请求到达时,可以从线程池中获取一个空闲的线程来处理请求,提高服务器的并发处理能力和响应速度.
在并发编程中,线程池可以用来管理并发执行的任务。通过将任务提交到线程池中,线程池会自动分配线程来执行任务,避免了频繁创建和销毁线程的开销,提高了系统的性能和资源利用率.
在异步任务处理中,线程池可以用来执行耗时的任务,将任务的执行和结果的获取分离开来。通过将任务提交到线程池中,可以立即返回一个Future对象,通过该对象可以获取任务的执行结果,提高系统的吞吐量和响应性.
线程池可以用来执行定时任务,通过ScheduledThreadPoolExecutor类提供的定时任务调度功能,可以在指定的时间间隔内周期性执行任务,例如定时任务的执行、定时数据的同步等.
线程池可以用来处理一些后台线程任务,例如日志记录、数据统计等。通过将这些任务提交到线程池中,可以减少对主线程的影响,提高系统的稳定性和响应速度.
线程池的核心组件包括线程池管理器、工作线程、任务队列和拒绝策略.
线程池管理器负责创建和管理线程池的各个组件。它可以根据需要创建线程池,并提供方法来提交任务、关闭线程池等操作.
工作线程是线程池中的实际执行任务的线程。线程池管理器会根据需要创建工作线程,并将任务分配给它们执行。工作线程执行完任务后,会继续从任务队列中获取下一个任务执行.
任务队列用于存放待执行的任务。当线程池中的工作线程执行完任务后,会从任务队列中获取下一个任务执行。任务队列可以是有界队列或无界队列(其实无界队列也是有界的,最大值是Integer.MAX_VALUE),有界队列可以限制任务的数量,避免过多的任务导致系统资源耗尽.
拒绝策略定义了当任务无法被线程池执行时的处理方式。当任务队列已满且无法继续添加任务时,线程池会根据拒绝策略来决定如何处理这个任务。常见的拒绝策略包括抛出异常、丢弃任务、丢弃最旧的任务或在调用者线程中直接执行任务。 这些核心组件共同协作,实现了线程池的功能.
线程池的创建方式包括直接使用构造方法创建和使用线程池工厂创建两种方式.
直接创建线程池是通过实例化ThreadPoolExecutor类来创建线程池。可以使用ThreadPoolExecutor的构造函数来指定线程池的核心线程数、最大线程数、任务队列、拒绝策略等参数,然后调用execute()方法提交任务。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 线程空闲时间
TimeUnit.MILLISECONDS, // 时间单位
new LinkedBlockingQueue<Runnable>(), // 任务队列
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
executor.execute(task); // 提交任务
executor.shutdown(); // 关闭线程池
线程池工厂是通过Executors类提供的静态方法来创建线程池。Executors类提供了一些常用的线程池创建方法,例如newFixedThreadPool()、newCachedThreadPool()、newSingleThreadExecutor()等。这些方法封装了线程池的创建过程,简化了线程池的配置。 示例代码:
ExecutorService executor = Executors.newFixedThreadPool(10); // 创建固定大小的线程池
executor.execute(task); // 提交任务
executor.shutdown(); // 关闭线程池
使用线程池工厂创建线程池可以方便地选择合适的线程池类型,并且无需手动配置线程池的各个参数。但是需要注意,线程池工厂创建的线程池可能不够灵活,不能满足特定的需求,而且默认使用的参数可能对不同的业务场景来说不是最优的,有可能引发性能问题。因此在一些复杂的场景下,直接通过构造方法创建ThreadPoolExecutor类可能更为适合.
这里线程池使用最全参数的构造方法来创建,然后添加两个任务到线程池执行,最后关闭线程池.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolTest {
public static void main(String[] args) throws Exception {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), new MyThreadFactory(), new MyRejectedExecutionHandler());
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "线程池创建完毕!");
// 执行任务,类型是Runnable,不带返回值
threadPoolExecutor.execute(() -> {
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + Thread.currentThread().getName() + "执行完成!");
});
// 提交任务,类型是Callable,返回值是Future类型,可观测任务状态
Future<String> future = threadPoolExecutor.submit(() -> {
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + Thread.currentThread().getName() + "开始执行!");
Thread.sleep(3000);
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + Thread.currentThread().getName() + "执行完成并返回了值!");
return "ok";
});
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "future.isDone():" + future.isDone() + ",future.get():" + future.get());
// 休眠4秒,等future执行完成后获取结果
Thread.sleep(4000);
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "future.isDone():" + future.isDone() + ",future.get():" + future.get());
// 调用shutdown方法不再接受新任务,会将队列里的任务全部执行完成后再关闭线程池
threadPoolExecutor.shutdown();
System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "线程池关闭!");
// 调用shutdownNow方法会返回线程池关闭后未执行的任务
// List<Runnable> remainder = threadPoolExecutor.shutdownNow();
}
static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "myThreadPool-thread-" + threadNumber.getAndIncrement());
}
}
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略");
// 当前线程执行
r.run();
}
}
}
执行结果:
2023-07-21 18:36:40线程池创建完毕!
2023-07-21 18:36:40myThreadPool-thread-1执行完成!
2023-07-21 18:36:40myThreadPool-thread-2开始执行!
2023-07-21 18:36:43myThreadPool-thread-2执行完成并返回了值!
2023-07-21 18:36:40future.isDone():false,future.get():ok
2023-07-21 18:36:47future.isDone():true,future.get():ok
2023-07-21 18:36:47线程池关闭!
注意:future.get()方法如果线程没有执行结束,则阻塞等待直到返回结果,解释下打印结果: “2023-07-21 18:36:43myThreadPool-thread-2执行完成并返回了值!”先于“2023-07-21 18:36:40future.isDone():false,future.get():ok”打印就是因为future.get()的阻塞等待,而future.isDone()不会阻塞,所以在主线程休眠前线程未执行完成,拿到的是false,休眠后线程执行完成,拿到的是true 。
线程池的各个参数包括核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务队列(workQueue)和拒绝策略(handler).
核心线程数指的是线程池中一直保持活动的线程数量,即使它们处于空闲状态(除非设置允许核心参数超时销毁)。当有新任务提交时,线程池会优先创建核心线程来执行任务,直到达到核心线程数.
最大线程数指的是线程池中允许的最大线程数量。当线程池中线程数量超过核心线程数并且任务队列已满时,线程池会创建新的线程来执行任务,直到达到最大线程数。超过最大线程数的任务将根据拒绝策略进行处理.
线程空闲时间指的是当线程池中的线程数量超过核心线程数时,多余的空闲线程在被回收之前的等待时间。如果线程在等待时间内没有任务可执行,则会被终止并从线程池中移除.
任务队列用于存放待执行的任务,采用的是阻塞队列,获取元素时如果队列为空则阻塞直到队列中放入任务被唤醒。当线程池中的线程数量达到核心线程数时,新的任务会被放入任务队列中等待执行。任务队列可以是有界队列(如ArrayBlockingQueue)或无界队列(如LinkedBlockingQueue,虽然是链表结构,但是也是有长度的,最大值是Integer.MAX_VALUE),有界队列可以限制任务的数量,避免过多的任务导致系统资源耗尽.
拒绝策略定义了当任务无法被线程池执行时的处理方式。当任务队列已满且无法继续添加任务时,线程池会根据拒绝策略来决定如何处理这个任务。常见的拒绝策略包括抛出异常(AbortPolicy)、丢弃任务(DiscardPolicy)、丢弃最旧的任务(DiscardOldestPolicy)或在调用者线程中直接执行任务(CallerRunsPolicy)。 通过合理配置这些参数,可以根据实际需求来优化线程池的性能和资源利用率。例如,适当调整核心线程数和最大线程数可以平衡线程池的并发度和资源消耗;合理选择任务队列类型可以控制任务的排队策略;选择合适的拒绝策略可以根据业务需求来处理无法执行的任务.
熟悉以上线程池相关的基础知识后,我们接下来通过线程池的管理和任务的处理流程来分析源码.
先看一下类的结构图 。
Executor接口只定义了一个基础的execute方法.
public interface Executor {
/**
* 核心且基础的任务执行方法
*/
void execute(Runnable command);
}
ExecutorService接口定义了线程池的一些常用操作.
public interface ExecutorService extends Executor {
/**
* 终止线程池,不再接受新任务,会将阻塞队列的任务执行完成
*/
void shutdown();
/**
* 立即终止线程池,阻塞队列的任务不再执行,返回未执行任务集合
*/
List<Runnable> shutdownNow();
/**
* 判断线程池状态是否停掉,只要线程池不是RUNNING状态,都返回true
*/
boolean isShutdown();
/**
* 判断线程池是否完全终止,状态是TERMINATED
*/
boolean isTerminated();
/**
* 阻塞等待,直到线程池是TERMINATED状态
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交Callable任务,返回Future
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交Runnable任务,返回Future,Future的get方法返回值就是result参数
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交Runnable任务,返回Future,Future的get方法返回值是null
*/
Future<?> submit(Runnable task);
/**
* 执行所有的Callable任务集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行所有的Callable任务集合,等待返回带超时时间
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行所有的Callable任务集合,返回其中最先执行完成的一个任务结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行所有的Callable任务集合,返回其中最先执行完成的一个任务结果,等待返回带超时时间
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService是一个抽象类,实现了接口的一些方法,未实现的方法继续留给子类实现.
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 将Runnable封装成RunnableFuture
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* 将Callable封装成RunnableFuture
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* 提交Runnable任务,返回Future,真正执行任务的是子类实现的execute方法
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* 提交Runnable任务,返回Future,Future的get方法返回值是result参数的值
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* 提交Callable任务,返回Future
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 其他invoke相关方法默认实现使用场景较少,这里不再列出具体方法逻辑了,可自行阅读源码
...
}
ThreadPoolExecutor类是线程池核心类,也是本文重点分析的源码类,该类具体实现了创建线程池、提交任务、添加工作线程、终止线程池等操作.
ScheduledThreadPoolExecutor是可任务调度的线程池通过扩展ThreadPoolExecutor实现的,比如典型的延迟执行等功能.
接下来会通过各个环节分析ThreadPoolExecutor源码及流程 。
设计的比较巧妙,将线程池状态和线程数采用一个int类型存储的,操作使用时都是基于位操作的,int整型占4个字节32位,使用高3位表示线程池状态,其余29位表示线程数量,所以线程池理论上最大的线程数是2^29-1=536870911。 计算机中二进制是补码形式表示的,正数的补码跟原码相同,负数的补码是负数的绝对值的反码+1,最高位是1,不熟悉的伙伴可以看这篇文章, java位运算及移位运算你还记得吗 。
// 值是29,两个意思,一是表示最大线程数量占用的二进制位数,二是左移29位可以取得线程池状态。
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池RUNNING状态,-1的二进制是:11111111 11111111 11111111 11111111,左移29位的结果,二进制表示为:11100000 00000000 00000000 00000000,高三位是111,表示的整数就是-536870912
private static final int RUNNING = -1 << COUNT_BITS;
// 线程池SHUTDOWN状态,0的二进制是:00000000 00000000 00000000 00000000,左移29位的结果,二进制表示为:00000000 00000000 00000000 00000000,高三位是000,表示的整数就是0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 线程池STOP状态,1的二进制是:00000000 00000000 00000000 00000001,左移29位的结果,二进制表示为:00100000 00000000 00000000 00000000,高三位是001,表示的整数就是536870912
private static final int STOP = 1 << COUNT_BITS;
// 线程池TIDYING状态,2的二进制是:00000000 00000000 00000000 00000010,左移29位的结果,二进制表示为:01000000 00000000 00000000 00000000,高三位是010,表示的整数就是1073741824
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池TERMINATED状态,3的二进制是:00000000 00000000 00000000 00000011,左移29位的结果,二进制表示为:01100000 00000000 00000000 00000000,高三位是011,表示的整数就是1610612736
private static final int TERMINATED = 3 << COUNT_BITS;
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY表示线程池中线程的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 获取线程池状态,~CAPACITY取反的结果就是高3位是1,低29位是0,然后跟c与操作,最终就是c的高3位+29位0的二进制转换成int就是线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量,CAPACITY高3位是0低29位是1,然后跟c与操作,最终就是高3位是0+c的低29位结果的二进制转换成int就是线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将线程池状态rs、线程数量wc执行或操作,ctlOf方法返回的int值就包含了这两个属性,用一个整数表示了。
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断线程池状态是否小于s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 判断线程池状态至少大于s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断线程池是否在运行中
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// cas操作,将线程数加1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// cas操作,将线程数减1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 将线程数减1,自旋操作,直到减1成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 线程池相关操作锁,全局锁,重要操作需要上锁,比如:添加工作线程、关闭线程池等
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池里的工作线程就是存储在这个hashset里
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁的条件变量,线程值终止时会用到
private final Condition termination = mainLock.newCondition();
// 保存的时线程池达到过的最大线程数量
private int largestPoolSize;
// 线程池中所有线程总共执行完成的任务数量
private long completedTaskCount;
// 创建线程的工厂类,可自定义
private volatile ThreadFactory threadFactory;
// 拒绝策略,可自定义
private volatile RejectedExecutionHandler handler;
// 非核心线程最大空闲时间,(如果允许核心线程退出,也使用这个空闲时间)
private volatile long keepAliveTime;
// 是否允许核心线程超时销毁,默认false,可设置为true,最低也需要有一个核心线程
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
线程池的创建有两种方式,线程池对象创建完成后,线程池的状态就默认是RUNNING状态.
其中,必须指定参数有以下几个.
另外两个参数有默认值,也可以自定义.
例如:
ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(2, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), new MyThreadFactory(), new MyRejectedExecutionHandler());
可以看到Executors提供了很多创建线程池的方法,其实底层也是调用ThreadPoolExecutor构造方法,只不过很多都是给的默认值而已。 随便看一个newFixedThreadPool方法,这个空闲时间是0,相当于线程获取任务时直接返回不阻塞,线程池的核心线程数和最大线程数是一样的.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
另外,很多java规范不建议通过Executors这种方式创建线程池,就是因为很多参数给的都是默认值,比如核心线程数跟最大线程数一致,无界队列(最大值Integer.MAX_VALUE)、线程最大空闲时间为0等,可能由于这些默认参数不满足业务场景导致CPU飙升内存溢出等问题,因结合评估业务场景使用合适的参数创建线程池.
拒绝策略是在阻塞队列达到最大值了而且线程数也达到最大值了,线程池无法再处理新提交过来的任务,这时候可以根据拒绝策略来对任务进行处理。在ThreadPoolExecutor默认采用的是AbortPolicy策略即抛异常处理。 拒绝策略接口是RejectedExecutionHandler,源码方法如下 。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在ThreadPoolExecutor线程池中默认有四种实现,也可以自定义拒绝策略的逻辑,实现这个接口即可.
这种策略是在当前提交任务的线程来执行任务.
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
默认的策略,抛异常终止任务.
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
什么都不做,相当于直接丢弃任务,简单粗暴,任务不会被执行,实际应用中要慎重.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
丢弃最先放入阻塞队列的任务,然后再重新再把任务提交到线程池.
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
从任务提交到执行结束,先看一下总流程,然后再分析细节流程.
线程池创建完成之后,就可以提交任务到线程池去执行了,提交任务有两种方法,一种是submit,一种是execute。submit方法提交到线程池后会返回一个Future对象,可以使用Future跟踪线程执行是否执行完成及获取结果,其实是将任务封装成RunnableFuture对象,真正的执行也是调用的execute方法,而execute方法不带返回值.
// 提交Runnable任务,返回Future对象
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交Runnable任务,返回Future对象,带返回值result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 提交Callable任务,返回Future对象,具体的返回值是实现Callable的call方法的返回值。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 执行任务的具体实现方法
public void execute(Runnable command) {
// 任务不能为null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程池中的线程数小于核心核心线程数,则添加核心线程,由核心线程去执行任务,然后核心线程会循环从阻塞队列等待获取任务并执行。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 走到这里有两种情况,1:线程池的线程数已经>=核心线程数了,2:线程池的线程数没有达到核心线程数但添加核心线程失败了。如果线程还是Running状态,则先把任务添加到阻塞队列里
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 线程池非Running状态并且移除阻塞队列移除该任务成功,则执行拒绝策略。
reject(command);
else if (workerCountOf(recheck) == 0)
// 工作线程数是0的情况下,增加一个新工作线程
addWorker(null, false);
}
// 走到这里说明线程数已经>=核心线程数了,而且往阻塞队列添加任务也失败了,可能是队列满了,这时候就尝试添加非核心线程,成功则结果,失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
final void reject(Runnable command) {
// 拒绝策略,调用构造线程池时传递的RejectedExecutionHandler对象rejectedExecution方法
handler.rejectedExecution(command, this);
}
常用的场景下,有两种情况会添加工作线程,一种是线程池中线程数还没有达到核心线程数时,有任务提交过来时会创建新的工作线程,还有一种情况是如果核心工作线程已经达到核心线程数,并且阻塞队列的任务也放满时,这时才会去创建非核心线程。 其实线程本质上不区分核心线程和非核心线程,只是在线程空闲时超过核心线程数的线程可销毁,达到动态调整线程数的目的,节省系统资源。 先看添加工作线程的流程图,然后再分析源码 添加工作线程可以分为两个阶段:
/*
* 添加工作线程
* @param firstTask 具体任务,可以为null
* @param core 是否核心线程表示,线程本质是不区分核心还是分核心的,只是用这个来标识要添加哪一种类型的线程,做判断用的。比如true的情况下,当前线程池的线程数要 < corePoolSize才能继续添加线程,为false的情况下,当前线程池的线程数要 < maximumPoolSize才能继续添加线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断是否需要添加工作线程,线程池状态>=SHUTDOWN时,再判断如果状态>SHUTDOWN或者传参firstTask不等于null或者阻塞队列没有任务这三种情况则不添加新的工作线程。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 这里采用自旋+cas操作,目的就是将工作线程数加1
for (;;) {
int wc = workerCountOf(c);
// 目前线程数>=线程池可以创建的理论线程数最大值则不再创建工作线程,或者要创建核心线程但线程池线程数已经达到corePoolSize,或者要创建非核心线程但线程池线程数已经达到maximumPoolSize,这两种情况也不再创建工作线程,直接返回添加工作线程失败标识。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas操作线程数加1,成功则跳出内层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 因为addWorker没有加锁操作,这里再判断一次线程池状态,拿到ctl最新值
if (runStateOf(c) != rs)
continue retry;
}
}
// 以上只是将ctl的线程数加1了,以下是真正的创建一个工作线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 构造一个Worker对象,构造方法里就实例化了一个具体的Thread线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 创建工作线程时,需要加锁,防止其他线程并发操作。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 判断线程池状态,并将Worker放入工作线程集合里
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 这里就是标记线程池里创建线程的最大值,这个值最大也不会超过maximumPoolSize。
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动工作线程
t.start();
workerStarted = true;
}
}
} finally {
// 如果工作线程没有启动,说明添加工作线程失败,需要把之前ctl线程数加1的操作回滚,就是ctl线程数减1,如果已经添加到工作线程集合里也需要移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 涉及工作线程的相关操作都需要加锁
mainLock.lock();
try {
// 从工作线程集合里移除worker
if (w != null)
workers.remove(w);
// cas操作ctl线程数减1
decrementWorkerCount();
// 判断是否需要终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
addWorker方法有两个参数,第一个参数表示任务,第二个参数表示是要创建核心线程还是非核心线程,这两个参数的组合及作用如下.
firstTask(Runnable) | core(boolean) | 作用 |
---|---|---|
command | true | 将要创建核心线程,command任务不放入队列,直接在创建的线程中执行,一般是核心线程数未达到corePoolSize时使用 |
command | false | 这种情况是在核心线程数已达到corePoolSize,将任务放入阻塞队列时失败了,所以这时候就创建非核心线程并将当前任务command立即执行 |
null | true | 这种情况一般是在线程池预热情况下使用,比如调用prestartCoreThread方法预热启动一个线程,prestartAllCoreThreads预热启动所有核心线程 |
null | false | 这种一般是阻塞队列中还有任务,但线程池没有工作线程了,这时候就创建一个工作线程去阻塞队列拿任务执行 |
Worker类是ThreadPoolExecutor内部类,就是具体的工作线程类,继承了AbstractQueuedSynchronizer并实现了Runnable接口,主要利用了AQS的同步机制来控制对工作线程的并发访问.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// Worker的Thread属性,其实干活的就是这个线程
final Thread thread;
// 任务
Runnable firstTask;
// 线程已经执行完成的任务总数
volatile long completedTasks;
// 构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 以当前对象创建Thread线程,线程执行时调用的就是这个类的run方法
this.thread = getThreadFactory().newThread(this);
}
// run方法执行任务,调用的是外部ThreadPoolExecutor的runWorker方法
public void run() {
runWorker(this);
}
...
}
添加worker工程线程完成之后,会启动这个工作线程,执行Woker对象的run方法.
public void run() {
// 其实线程真正的执行逻辑是外部的ThreadPoolExecutor类的runWorker方法
runWorker(this);
}
我们看看java.util.concurrent.ThreadPoolExecutor#runWorker这个方法,这个方法是非常核心和重要的,真正执行业务逻辑就是通过这里调用的,而且工作线程也是通过while循环不断的去获取任务并执行从而达到线程复用的目的。 先看一下runWorker的流程图 工作线程结束有两种情况,一种是执行任务过程中发生异常,会将异常抛出,当前线程结束,还有一种情况是Woker的task为null或者getTask方法从阻塞队列未获取到任务,线程正常销毁结束.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果获取的任务为null,则线程会销毁,其实线程的复用核心就是在这里,线程在while循环中不停的去获取任务并执行。
while (task != null || (task = getTask()) != null) {
w.lock();
// 判断线程的状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 可自定义任务执行前的逻辑,默认空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务的业务逻辑
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 可自定义任务执行后的逻辑,默认空实现
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 没有拿到任务,线程销毁操作,completedAbruptly标识,false:正常退出,true:异常退出
processWorkerExit(w, completedAbruptly);
}
}
任务的获取主要是getTask方法实现。 (1)如果这个方法返回null说明可能是线程池要关闭不再接受任务并且阻塞队列也没有任务了,也有可能是阻塞队列中没有任务了,则后续需要销毁这个工作线程。 (2)如果返回Runnable说明从阻塞队列拿到了任务,后续继续执行任务。 看一下getTask的流程图 。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 循环调用,其中会判断线程池状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池即将关闭状态,如果阻塞队列中也没有任务了,返回null,runWorker方法没有拿到task则退出while循环,销毁线程。
// 这里根据shutdown和shutdownNow设置不同的线程池状态走不同的逻辑
// 如果线程池状态是STOP则直接线程数减1,然后返回null,runWorker方法会退出while循环,线程销毁
// 如果线程池状态是SHUTDOWN则再看看阻塞队列是否为空,为空则线程数减1,后续线程销毁,不会空则继续获取任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取当前线程数
int wc = workerCountOf(c);
// 是否允许超时标识,allowCoreThreadTimeOut核心线程是否允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 非核心线程过多或者允许超时的情况下,如果队列为空则工作线程减1,后续销毁线程,这里就返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 允许核心线程超时或者线程数大于核心线程数时,采用poll取数据,非阻塞,超过keepAliveTime没有获取到数据就继续自旋获取任务,
// 不允许核心线程超时或者线程数小于等于核心线程数时,采用take取数据,阻塞等待直到获取到任务或者被中断
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从getTask方法返回,只有两种结果,一种是拿到了任务,执行任务后接着进行下一次循环继续拿任务,还有一种是返回null,表示未获取到任务,这就是空闲线程,可以销毁了,这时已经在getTask方法中奖ctl的线程数减1了,在runWorker方法的processWorkerExit正常将线程从工作线程集合移除即可。 runWorker方法中在finally块中调用的processWorkerExit方法.
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true,说明可能异常导致线程退出,这是非正常退出,为保证线程数量需要重新再创建一个线程,所以这里先将线程数减1,正常退出时已经在getTask方法线程数减过1了。
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 总的任务完成后把当前即将销毁的线程完成的任务数加上
completedTaskCount += w.completedTasks;
// 线程集合里移除这个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 线程池在STOP状态前,如果是正常退出,任务队列还有任务的情况下,最少还得一个线程,如果线程池线程数>=1则不再创建,否则新建一个工作线程。如果是异常退出则直接新建一个工作线程。
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 这里会再添加一个线程
addWorker(null, false);
}
}
再看一下tryTerminate方法,这个方法是在线程要销毁或者线程池要关闭时会调用,主要是判断是否需要中断空闲的线程,而且逻辑中调用了terminated()方法,可以自定义扩展实现.
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池在运行状态或者阻塞队列不为空时直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程数>0,则中断一个线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 走到这里,说明工作线程数为0了,cas操作将线程状态置为TIDYING,这是个过渡状态,线程数设置为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 默认方法体为空,可作为扩展实现,线程池即将STOP状态时会调这个方法,相当于从外界感知线程池真正关闭的通知
terminated();
} finally {
// 最终会将线程池状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
线程池关闭有两种方法,一种是shutdown,另外一种是shutdownNow 。
将线程池状态设置为SHUTDOWN后不再接受新任务,将阻塞队列的任务执行完成后,线程池关闭.
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 给worker线程发中断标识
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// cas操作设置线程池状态为targetState
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断工作线程,其实工作线程仍会继续走一遍循环,主要判断线程池状态和阻塞队列任务数来决定是否销毁
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
调用shutdownNow方法会将线程池状态设置为STOP后不再接受新任务,然后将所有线程中断(这里的中断已经拿到任务并执行不会响应中断,是在调用getTask获取下一个任务时看线程池状态为STOP则不会再取阻塞队列任务,直接返回null,然后工作线程销毁,还有一种情况是正在阻塞等待拿任务,阻塞在poll或take上,都会响应中断,然后再一次循环任务返回null),并将未执行的任务返回,线程池关闭.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 将阻塞队列的剩余任务导出到新集合中并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// Worker类方法
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 中断工作线程,其实工作线程仍会继续走一遍循环,主要判断线程池状态和阻塞队列任务数来决定是否销毁
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
一般情况下,要关闭线程池建议使用shutdown,它会将阻塞队列的任务执行完成后再关闭线程池,比较稳妥,不会出现任务没有执行完的情况,如果要使用shutdownNow关闭线程池,则它会返回未执行的任务,需要根据业务情况做相应任务的处理.
前文已经将线程池的5种状态列举出来了,我们看下这5种状态的流转.
线程池的扩展机制包括任务前后的处理、线程池状态的监听和自定义拒绝策略.
public class MyThreadPoolExecutorTest {
public static void main(String[] args) {
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
myThreadPoolExecutor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "我是任务1");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
myThreadPoolExecutor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "我是任务2");
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
myThreadPoolExecutor.shutdown();
}
}
class MyThreadPoolExecutor extends ThreadPoolExecutor {
private ThreadLocal<Long> threadLocal = new ThreadLocal<>();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, @NotNull TimeUnit unit, @NotNull BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new MyThreadFactory(), new MyRejectedExecutionHandler());
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(Thread.currentThread().getName() + "执行业务逻辑前执行beforeExecute这个方法");
threadLocal.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(Thread.currentThread().getName() + "执行业务逻辑后执行afterExecute这个方法,执行业务逻辑耗时:" + (System.currentTimeMillis() - threadLocal.get()) + "ms");
threadLocal.remove();
}
@Override
protected void terminated() {
System.out.println("线程池关闭啦");
}
}
class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "myThreadPool-thread-" + threadNumber.getAndIncrement());
}
}
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略");
// 当前线程执行
r.run();
}
}
输出:
myThreadPool-thread-1执行业务逻辑前执行beforeExecute这个方法
myThreadPool-thread-1我是任务1
myThreadPool-thread-2执行业务逻辑前执行beforeExecute这个方法
myThreadPool-thread-2我是任务2
myThreadPool-thread-1执行业务逻辑后执行afterExecute这个方法,执行业务逻辑耗时:1009ms
myThreadPool-thread-2执行业务逻辑后执行afterExecute这个方法,执行业务逻辑耗时:3004ms
线程池关闭啦
通过对Java线程池源码的深入分析,我们了解了线程池的原理和机制。合理使用线程池可以提高程序的性能和稳定性,减少资源的消耗。在实际开发中,我们应根据具体的需求和场景选择合适的线程池配置,并结合线程池的扩展机制进行定制化开发。希望本文能够帮助读者更好地理解和使用Java线程池.
最后此篇关于万字长文深度解读Java线程池,硬核源码分析的文章就讲到这里了,如果你想了解更多关于万字长文深度解读Java线程池,硬核源码分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
本文使用第一人称来介绍Redis 一、概述 Redis,英文全称是Remote Dictionary Server(远程字典服务),是一个开源的使用ANSI C语言编写、支持网络、可基于
这个问题已经有答案了: 奥 git _a (12 个回答) 已关闭 7 年前。 我正在为一个类项目创建一个正则表达式解析器,但我遇到了一个不知道如何解决的问题。每当遇到带括号的组时,都应该向 grou
我在玩简单的牛轧糖和十字游戏时遇到问题,我试图设置获胜条件,在击中三个位置/ block 时,它会显示一条消息,声明您已赢得游戏并禁用所有其他位置/ block 但是我的行为很奇怪,例如,如果您尝试通
我是一名优秀的程序员,十分优秀!