上一篇Java线程池-1中,我们了解了线程池的相关参数以及内置的几种线程池,现在我们来研究下如何自己实现一个线程池。
在实现之前,我们先思考下,一个线程池有哪些要素呢?
- 线程复用
- 线程管理
- 拒绝策略
线程复用
如何实现线程的复用呢?还记得SingleThreadExecutor吗,这个线程池只有一个工作线程,这其实就是线程复用最简单的实现方式:只用一个线程来处理所有的任务。
public class MyThreadPoolExecutor {
/**
* 保存提交的任务
*/
private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
public MyThreadPoolExecutor() {
// 启动一个线程不断拉取队列中的任务,并执行任务
new Thread(()->{
while (true){
try {
Runnable take = taskQueue.take();
take.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* 提交一个任务至线程池中
* @param task 任务
*/
public void execute(Runnable task){
taskQueue.offer(task);
}
}
在线程池创建时,就启动一个线程,不断从队列中拉取任务,并执行任务。上述线程池虽然实现了线程的复用,但是我们无法对线程进行控制,既不能创建也不能销毁。
线程管理(woker创建与销毁,线程池maximumPoolSize)
为了方便对线程进行管理,我们可以把其封装成一个worker对象,通过管理对象来管理线程。
/**
* @author wbl
* @date 2020-02-06
*/
public class MyThreadPoolExecutor2 {
/**
* 等待任务队列
*/
private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* 管理工作线程
*/
private HashSet<Worker> workers = new HashSet<>();
/**
* 线程池最大线程数
*/
private int maximumPoolSize;
/**
* 定制工作线程的创建
*/
ThreadFactory threadFactory;
public MyThreadPoolExecutor2(int maximumPoolSize, ThreadFactory threadFactory) {
this.maximumPoolSize = maximumPoolSize;
this.threadFactory = threadFactory;
}
/**
* 若新增worker成功,则直接返回,若worker达到上限,则将任务放入等待队列
* @param task
*/
public void execute(Runnable task){
if (addWorker(task)){
return;
}
taskQueue.offer(task);
}
/**
* 新增一个工作线程worker
* @param firstTask 提交的任务
* @return true 新增成功, false 线程池线程数量达到上限,不在创建新的worker
*/
private boolean addWorker(Runnable firstTask){
if (workers.size() >= maximumPoolSize){
return false;
}
Worker worker = new Worker(firstTask);
Thread t = worker.thread;
if (t != null){
workers.add(worker);
t.start();
}
return true;
}
/**
* 启动worker
* @param worker
*/
private void runWorker(Worker worker){
Runnable task = worker.firstTask;
// 不断从等待队列来取任务
while (task != null || (task = getTask()) != null){
task.run();
}
// 若没有任务,则销毁worker
processWorkerExit(worker);
}
/**
* 销毁空闲worker
* @param worker 待销毁worker
*/
private void processWorkerExit(Worker worker){
workers.remove(worker);
}
/**
* 从等待队列拉取任务
* @return
*/
private Runnable getTask(){
return taskQueue.poll();
}
class Worker implements Runnable{
Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = threadFactory.newThread(this);
}
@Override
public void run() {
runWorker(this);
}
}
}
在这个版本中,我们新增了一个散列集workers用来保存创建的worker,同时新增了一个threadFactory,以便用户定制化worker的创建
线程的启动是通过runWorker方法来实现,worker在创建时会提交第一个任务(firstTask),之后worker不断从等待队列中来取任务
线程的创建是通过addWorker方法来实现,在创建新的worker时,我们先判断worker的数量是否达到线程池的上限(maximumPoolSize),若达到上限,则不再创建worker,并把提交的task放入等待队列中。
线程的销毁是通过processWorkerExit,当worker从等待队列中获取不到任务时,便删除该worker。
线程管理(核心线程数,线程存活时间)
细心的读者可能会发现上述版本的线程池其实存在一个问题,我们再来看下runWorker的实现。当任务提交不频繁时,等待队列不会有堆积的任务,这样新提交一个任务,线程池创建一个worker来执行任务,任务执行完毕后,该worker立马就被销毁。再次提交任务,线程池又会重新创建一个worker,这样线程就没有被复用。
/**
* 启动worker
* @param worker
*/
private void runWorker(Worker worker){
Runnable task = worker.firstTask;
// 不断从等待队列来取任务
while (task != null || (task = getTask()) != null){
task.run();
}
// 若没有任务,则销毁worker
processWorkerExit(worker);
}
针对上述问题,我们需要新增两个变量,corePoolSize(核心线程数)以及keepAliveTime(线程存活时间)。
首先我们修改getTask方法,若等待队列没有新增任务,则等待一段时间,超过时间还没有任务,则销毁worker
private Runnable getTask(){
Runnable task = null;
try {
task = taskQueue.poll(keepAliveTime, timeUnit);
}catch (InterruptedException e) {
}
return task;
}
其次需要修改addWorker方法,增加参数core,表示创建的是否是核心线程
private boolean addWorker(Runnable firstTask, boolean core){
int ws = workers.size();
if (ws >= (core ? corePoolSize : maximumPoolSize)){
return false;
}
Worker worker = new Worker(firstTask);
Thread t = worker.thread;
if (t != null){
workers.add(worker);
t.start();
}
return true;
}
最后修改execute方法,新增任务,首先创建核心线程,若核心线程达到上限,则判断线程池的线程数是否达到上限,若达到上限,则把任务放入等待队列
public void execute(Runnable task){
// 若新增核心线程成功,则直接返回
if (addWorker(task, true)){
return;
}
// 若新增worker失败,则把任务放入等待队列
if (!addWorker(task, false)){
taskQueue.offer(task);
}
}
拒绝策略
上述线程池还有需要完善的地方,对于处理失败的任务,需要提供一个切口供用户处理。我们提供一个接口AbortPolicy,用户可以实现该接口来处理这些任务
/**
* @author wbl
* @date 2020-02-06
*/
public interface AbortPolicy {
void reject(Runnable task, ThreadPoolExecutor poolExecutor);
}
接下来我们修改execute方法,对于提交失败的任务调用reject方法
public void execute(Runnable task){
// 若新增核心线程成功,则直接返回
if (addWorker(task, true)){
return;
}
// 若新增worker失败,则把任务放入等待队列
if (!addWorker(task, false)){
// 等待队列满了,则拒绝处理这个任务
if (!taskQueue.offer(task)){
abortPolicy.reject(task,this);
}
}
}
小结
大家可以在思考下,上述线程池是否还有需要完善的地方呢?答案是肯定的,上述线程池实际上简化了JDK线程池相关的逻辑。真正的线程池在管理worker时,还需要处理并发的问题以保证创建worker被正确,另外还需要管理线程池状态(shutdown,terminated)