Java 线程池实现的原理
如果只讲线程池的使用,那这篇文章没有什么大的价值,充其量也就是熟悉 Executor 相关 API 的过程。
线程池的实现过程没有用到 Synchronized 关键字,用的都是 Volatile,Lock 和同步(阻塞)队列,Atomic 相关类,FutureTask 等等,因为后者的性能更优。
理解的过程可以很好的学习源码中并发控制的思想。
线程池的优点是可总结为以下三点:
线程复用
控制最大并发数
管理线程
1.线程复用过程
理解线程复用原理首先应了解线程生命周期。
在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。
Thread 通过 new 来新建一个线程,这个过程是是初始化一些线程信息,如线程名,id,线程所属 group 等,可以认为只是个普通的对象。
调用 Thread的start() 后 Java 虚拟机会为其创建方法调用栈和程序计数器,同时将 hasBeenStarted 为 true,之后调用 start 方法就会有异常。
处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于 JVM 里线程调度器的调度。
当线程获取 cpu 后,run() 方法会被调用。不要自己去调用 Thread 的 run() 方法。
之后根据 CPU 的调度在就绪——运行——阻塞间切换,直到 run() 方法结束或其他方式停止线程,进入 dead 状态。
所以实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)。
接下来来看下 ThreadPoolExecutor 是怎么实现线程复用的。
在 ThreadPoolExecutor 主要 Worker 类来控制线程的复用。
看下 Worker 类简化后的代码,这样方便理解:
private final class Worker implements Runnable { final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);
} public void run() {
runWorker(this);
} final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null; while (task != null || (task = getTask()) != null){
task.run();
}
}
Worker 是一个 Runnable,同时拥有一个 thread,这个 thread 就是要开启的线程,在新建 Worker 对象时同时新建一个 Thread 对象,同时将 Worker 自己作为参数传入 TThread,这样当T hread的start() 方法调用时,运行的实际上是 Worker的run() 方法,接着到 runWorker() 中,有个 while 循环,一直从 getTask() 里得到 Runnable 对象,顺序执行。
getTask() 又是怎么得到 Runnable 对象的呢?
依旧是简化后的代码:
private Runnable getTask() { if(一些特殊情况) { return null;
}
Runnable r = workQueue.take(); return r;
}
这个 workQueue 就是初始化 ThreadPoolExecutor 时存放任务的 BlockingQueue 队列,这个队列里的存放的都是将要执行的 Runnable 任务。
因为 BlockingQueue 是个阻塞队列,BlockingQueue.take() 得到如果是空,则进入等待状态直到 BlockingQueue 有新的对象被加入时唤醒阻塞的线程。
所以一般情况 Thread的run() 方法就不会结束,而是不断执行从 workQueue 里的 Runnable 任务,这就达到了线程复用的原理了。
2.控制最大并发数
那 Runnable 是什么时候放入 workQueue?
Worker 又是什么时候创建,Worker 里的 Thread 的又是什么时候调用 start() 开启新线程来执行 Worker 的 run() 方法的呢?
有上面的分析看出 Worker 里的 runWorker() 执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?
很容易想到是在 execute(Runnable runnable) 时会做上面的一些任务。
看下 execute 里是怎么做的。
execute:
简化后的代码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。
if (addWorker(command, true)) return;
c = ctl.get();
} // 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检验是否为RUNNING状态
// 非RUNNING状态 则从workQueue中移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);// 采用线程池指定的策略拒绝任务
// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false)) reject(command);
}
addWorker:
简化后的代码
private boolean addWorker(Runnable firstTask, boolean core) { int wc = workerCountOf(c); if (wc >= (core ? corePoolSize : maximumPoolSize)) { return false;
}
w = new Worker(firstTask); final Thread t = w.thread;
t.start();
}
根据代码再来看上面提到的线程池工作过程中的添加任务的情况:
* 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务; * 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;* 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;* 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
这就是 Android 的 AsyncTask 在并行执行是在超出最大任务数是抛出 RejectExecutionException 的原因所在。
通过 addWorker 如果成功创建新的线程成功,则通过 start() 开启新线程,同时将 firstTask 作为这个 Worker 里的 run() 中执行的第一个任务。
虽然每个 Worker 的任务是串行处理,但如果创建了多个 Worker,因为共用一个 workQueue,所以就会并行处理了。
所以根据 corePoolSize 和 maximumPoolSize 来控制最大并发数。
大致过程可用下图表示。
上面的讲解和图来可以很好的理解的这个过程。
如果是做 Android 开发的,并且对 Handle r原理比较熟悉,你可能会觉得这个图挺熟悉,其中的一些过程和 Handler,Looper,Meaasge 使用中,很相似。
Handler.send(Message) 相当于 execute(Runnuble),Looper 中维护的 Meaasge 队列相当于 BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper 中的 loop() 函数循环从 Meaasge 队列取 Meaasge 和 Worker 中的 runWork() 不断从 BlockingQueue 取 Runnable 是同样的道理。
3.管理线程
通过线程池可以很好的管理线程的复用,控制并发数,以及销毁等过程,线程的复用和控制并发上面已经讲了,而线程的管理过程已经穿插在其中了,也很好理解。
在 ThreadPoolExecutor 有个 ctl 的 AtomicInteger 变量。
通过这一个变量保存了两个内容:
所有线程的数量
每个线程所处的状态
其中低29位存线程数,高 3 位存 runState,通过位运算来得到不同的值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//得到线程的状态private static int runStateOf(int c) { return c & ~CAPACITY;
}//得到Worker的的数量private static int workerCountOf(int c) { return c & CAPACITY;
}// 判断线程是否在运行private static boolean isRunning(int c) { return c < SHUTDOWN;
}
这里主要通过 shutdown 和 shutdownNow() 来分析线程池的关闭过程。
首先线程池有五种状态来控制任务添加与执行。主要介绍以下三种:
RUNNING 状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
SHUTDOWN 状态:不再接受新的任务,但是会执行队列中的任务;
STOP 状态:不再接受新任务,不处理队列中的任务。
shutdown 这个方法会将 runState 置为 SHUTDOWN,会终止所有空闲的线程,而仍在工作的线程不受影响,所以队列中的任务人会被执行。
shutdownNow 方法将 runState 置为 STOP。和 shutdown 方法的区别,这个方法会终止所有的线程,所以队列中的任务也不会被执行了。
感谢大家阅读由java培训机构分享的“Java 线程池实现的原理”希望对大家有所帮助,更多精彩内容请关注Java培训官网
免责声明:本文由小编转载自网络,旨在分享提供阅读,版权归原作者所有,如有侵权请联系我们进行删除
【免责声明】本文部分系转载,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,如涉及作品内容、版权和其它问题,请在30日内与我们联系,我们会予以重改或删除相关文章,以保证您的权益!
Java开发高端课程免费试学
大咖讲师+项目实战全面提升你的职场竞争力
- 海量实战教程
- 1V1答疑解惑
- 行业动态分析
- 大神学习路径图
相关推荐
更多2015-10-15
2015-10-15
达内就业喜报
更多>Java开班时间
-
北京 丨 11月27日
火速抢座 -
上海 丨 11月27日
火速抢座 -
广州 丨 11月27日
火速抢座 -
兰州 丨 11月27日
火速抢座 -
杭州 丨 11月27日
火速抢座 -
南京 丨 11月27日
火速抢座 -
沈阳 丨 11月27日
火速抢座 -
大连 丨 11月27日
火速抢座 -
长春 丨 11月27日
火速抢座 -
哈尔滨 丨 11月27日
火速抢座 -
济南 丨 11月27日
火速抢座 -
青岛 丨 11月27日
火速抢座 -
烟台 丨 11月27日
火速抢座 -
西安 丨 11月27日
火速抢座 -
天津 丨 11月27日
火速抢座 -
石家庄 丨 11月27日
火速抢座 -
保定 丨 11月27日
火速抢座 -
郑州 丨 11月27日
火速抢座 -
合肥 丨 11月27日
火速抢座 -
太原 丨 11月27日
火速抢座 -
苏州 丨 11月27日
火速抢座 -
武汉 丨 11月27日
火速抢座 -
成都 丨 11月27日
火速抢座 -
重庆 丨 11月27日
火速抢座 -
厦门 丨 11月27日
火速抢座 -
福州 丨 11月27日
火速抢座 -
珠海 丨 11月27日
火速抢座 -
南宁 丨 11月27日
火速抢座 -
东莞 丨 11月27日
火速抢座 -
贵阳 丨 11月27日
火速抢座 -
昆明 丨 11月27日
火速抢座 -
洛阳 丨 11月27日
火速抢座 -
临沂 丨 11月27日
火速抢座 -
潍坊 丨 11月27日
火速抢座 -
运城 丨 11月27日
火速抢座 -
呼和浩特丨11月27日
火速抢座 -
长沙 丨 11月27日
火速抢座 -
南昌 丨 11月27日
火速抢座 -
宁波 丨 11月27日
火速抢座 -
深圳 丨 11月27日
火速抢座 -
大庆 丨 11月27日
火速抢座