Java线程池-2

上一篇Java线程池-1中,我们了解了线程池的相关参数以及内置的几种线程池,现在我们来研究下如何自己实现一个线程池。

在实现之前,我们先思考下,一个线程池有哪些要素呢?

  1. 线程复用
  2. 线程管理
  3. 拒绝策略

线程复用

如何实现线程的复用呢?还记得SingleThreadExecutor吗,这个线程池只有一个工作线程,这其实就是线程复用最简单的实现方式:只用一个线程来处理所有的任务。

public class MyThreadPoolExecutor {
    /**
     * 保存提交的任务
     */
    private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    public MyThreadPoolExecutor() {
        // 启动一个线程不断拉取队列中的任务,并执行任务
        new Thread(()->{
            while (true){
                try {
                    Runnable take = taskQueue.take();
                    take.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 提交一个任务至线程池中
     * @param task 任务
     */
    public void execute(Runnable task){
        taskQueue.offer(task);
    }
}

在线程池创建时,就启动一个线程,不断从队列中拉取任务,并执行任务。上述线程池虽然实现了线程的复用,但是我们无法对线程进行控制,既不能创建也不能销毁。

线程管理(woker创建与销毁,线程池maximumPoolSize)

为了方便对线程进行管理,我们可以把其封装成一个worker对象,通过管理对象来管理线程。

/**
 * @author wbl
 * @date 2020-02-06
 */
public class MyThreadPoolExecutor2 {

    /**
     * 等待任务队列
     */
    private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    /**
     * 管理工作线程
     */
    private HashSet<Worker> workers = new HashSet<>();

    /**
     * 线程池最大线程数
     */
    private int maximumPoolSize;
    /**
     * 定制工作线程的创建
     */
    ThreadFactory threadFactory;


    public MyThreadPoolExecutor2(int maximumPoolSize, ThreadFactory threadFactory) {
        this.maximumPoolSize = maximumPoolSize;
        this.threadFactory = threadFactory;
    }

    /**
     * 若新增worker成功,则直接返回,若worker达到上限,则将任务放入等待队列
     * @param task
     */
    public void execute(Runnable task){
        if (addWorker(task)){
            return;
        }
        taskQueue.offer(task);
    }

    /**
     * 新增一个工作线程worker
     * @param firstTask 提交的任务
     * @return true 新增成功, false 线程池线程数量达到上限,不在创建新的worker
     */
    private boolean addWorker(Runnable firstTask){
        if (workers.size() >= maximumPoolSize){
            return false;
        }
        Worker worker = new Worker(firstTask);
        Thread t = worker.thread;
        if (t != null){
            workers.add(worker);
            t.start();
        }
        return true;
    }

    /**
     * 启动worker
     * @param worker
     */
    private void runWorker(Worker worker){
        Runnable task = worker.firstTask;
        // 不断从等待队列来取任务
        while (task != null || (task = getTask()) != null){
            task.run();
        }

        // 若没有任务,则销毁worker
        processWorkerExit(worker);
    }

    /**
     * 销毁空闲worker
     * @param worker 待销毁worker
     */
    private void processWorkerExit(Worker worker){
        workers.remove(worker);
    }

    /**
     * 从等待队列拉取任务
     * @return
     */
    private Runnable getTask(){
        return taskQueue.poll();
    }

    class Worker implements Runnable{

        Thread thread;

        Runnable firstTask;

        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = threadFactory.newThread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }
}

在这个版本中,我们新增了一个散列集workers用来保存创建的worker,同时新增了一个threadFactory,以便用户定制化worker的创建

线程的启动是通过runWorker方法来实现,worker在创建时会提交第一个任务(firstTask),之后worker不断从等待队列中来取任务

线程的创建是通过addWorker方法来实现,在创建新的worker时,我们先判断worker的数量是否达到线程池的上限(maximumPoolSize),若达到上限,则不再创建worker,并把提交的task放入等待队列中。

线程的销毁是通过processWorkerExit,当worker从等待队列中获取不到任务时,便删除该worker。

线程管理(核心线程数,线程存活时间)

细心的读者可能会发现上述版本的线程池其实存在一个问题,我们再来看下runWorker的实现。当任务提交不频繁时,等待队列不会有堆积的任务,这样新提交一个任务,线程池创建一个worker来执行任务,任务执行完毕后,该worker立马就被销毁。再次提交任务,线程池又会重新创建一个worker,这样线程就没有被复用。

/**
 * 启动worker
 * @param worker
 */
private void runWorker(Worker worker){
    Runnable task = worker.firstTask;
    // 不断从等待队列来取任务
    while (task != null || (task = getTask()) != null){
        task.run();
    }

    // 若没有任务,则销毁worker
    processWorkerExit(worker);
}

针对上述问题,我们需要新增两个变量,corePoolSize(核心线程数)以及keepAliveTime(线程存活时间)。

首先我们修改getTask方法,若等待队列没有新增任务,则等待一段时间,超过时间还没有任务,则销毁worker

private Runnable getTask(){
    Runnable task = null;
    try {
        task = taskQueue.poll(keepAliveTime, timeUnit);
    }catch (InterruptedException e) {

    }
    return task;
}

其次需要修改addWorker方法,增加参数core,表示创建的是否是核心线程

private boolean addWorker(Runnable firstTask, boolean core){
    int ws = workers.size();
    if (ws >= (core ? corePoolSize : maximumPoolSize)){
        return false;
    }
    Worker worker = new Worker(firstTask);
    Thread t = worker.thread;
    if (t != null){
        workers.add(worker);
        t.start();
    }
    return true;
}

最后修改execute方法,新增任务,首先创建核心线程,若核心线程达到上限,则判断线程池的线程数是否达到上限,若达到上限,则把任务放入等待队列

public void execute(Runnable task){
        // 若新增核心线程成功,则直接返回
        if (addWorker(task, true)){
            return;
        }

        // 若新增worker失败,则把任务放入等待队列
        if (!addWorker(task, false)){
            taskQueue.offer(task);
        }
    }

拒绝策略

上述线程池还有需要完善的地方,对于处理失败的任务,需要提供一个切口供用户处理。我们提供一个接口AbortPolicy,用户可以实现该接口来处理这些任务

/**
 * @author wbl
 * @date 2020-02-06
 */
public interface AbortPolicy {
    void reject(Runnable task, ThreadPoolExecutor poolExecutor);
}

接下来我们修改execute方法,对于提交失败的任务调用reject方法

public void execute(Runnable task){
    // 若新增核心线程成功,则直接返回
    if (addWorker(task, true)){
        return;
    }

    // 若新增worker失败,则把任务放入等待队列
    if (!addWorker(task, false)){
        // 等待队列满了,则拒绝处理这个任务
        if (!taskQueue.offer(task)){
            abortPolicy.reject(task,this);
        }
    }
}

小结

大家可以在思考下,上述线程池是否还有需要完善的地方呢?答案是肯定的,上述线程池实际上简化了JDK线程池相关的逻辑。真正的线程池在管理worker时,还需要处理并发的问题以保证创建worker被正确,另外还需要管理线程池状态(shutdown,terminated)


Reprint please specify: wbl Java线程池-2

Previous
Kafka——消息存储与处理-3 Kafka——消息存储与处理-3
消息存储存储路径kakfa的消息都会持久化到磁盘,并以日志文件的方式存储。日志文件的保存路径配置在config/server.properties中 # A comma seperated list of directories under
2020-02-15
Next
Java线程池-1 Java线程池-1
总所周知,创建线程比较损耗资源,频繁创建线程容易造成性能降低。因此需要使用线程池,主要原因是可以复用线程,另外使用线程池也可以比较方便的对线程进行管理。下面,让我们一起来看下在Java中如何使用线程池。 ThreadPoolExecutor
2020-02-05