轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReetrantLock的使用
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
        • 一、何为线程池
        • 二、先来讲讲Executors与ExecutorService
          • 1、ScheduledExecutorService
        • 三、重头戏之ThreadPoolExecutor
          • 1、workQueue
          • 2、threadFactory
          • 3、rejectedExecutionHandler
          • 4、扩展线程池
        • 四、不要使用Executors(What?)
        • 五、submit提交任务无异常的问题
        • 六、线程池停止
      • Java并发-阻塞队列BlockingQueue
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
      • Java并发-Fork Join框架
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2018-06-25
目录

Java并发-线程池ThreadPoolExecutor

# 一、何为线程池

如果你接触过对线池,比如数据库连接池,他们要解决的问题是:对象的启动耗费资源比较多,最好能做到只启动一次,然后重复使用。我们平常用到static final去修饰常量,也是这个意思。
对于线程来说,启动线程相对来说耗费时间会久一点,而且线程开太多,又会耗费内存,导致GC压力,所以我们需要有控制和管理的手段。
线程池顾名思义就是一个池子,使用了它之后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。

# 二、先来讲讲Executors与ExecutorService

JDK中自然提供了一套线程池的实现,就是ThreadPoolExecutor。
为了更简单的使用ThreadPoolExecutor,JDK提供了线程池工厂类Executors,我们来看看他都与那些工厂方法:

  • newFixedThreadPool(int nThreads) 构建一个拥有固定线程数量的线程池
  • newSingleThreadExecutor() 构建一个只拥有一个线程的线程池
  • newCachedThreadPool() 构建一个弹性的线程池,需要多少就有多少
  • newSingleThreadScheduledExecutor() 构建一个只拥有一个线程的计划任务线程池
  • newScheduledThreadPool(int corePoolSize) 构建一个弹性的计划任务线程池

我们先来看一个例子:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(()->{return 12;});
System.out.println(future.get());
executorService.shutdown();

我们看到了几个新的类,Executors的工厂类方法会返回一个ExecutorService,实际上他是ThreadPoolExecutor的父接口。
ExecutorService提供了submit()方法,把一个Runnable或Callback提交到线程池去执行,这里的入参是Callback,Callback是一个回调接口,没有入参但有返回值。submit()方法提交了一个任务之后,会返回一个Future用来表示异步计算的结果,这里也就是获得Callback所返回的结果。
那么如果submit()入参是Runnable呢,这样的话Future一般只会获取到null。
ExecutorService是继承自Executor接口,有时你不需要用Future来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{});
executorService.shutdown();

需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()方法,ExecutorService还有一个shutdownNow()方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。

# 1、ScheduledExecutorService

在上面我们看到Executors的工厂方法中有两个会返回ScheduledExecutorService,分别是newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize),这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:

  • scheduleAtFixedRate() 以上个任务的执行时间为起点,之后的period时间,调度下一次任务
  • scheduleWithFixedDelay() 以上个任务的结束时间为起点,经过delay时间进行任务调度

# 三、重头戏之ThreadPoolExecutor

Executors的工厂方法比较合适初学者使用,简单直接。如果你想做一个高玩,那么就不得不去探索一下核心线程池的内部实现了。
来看一下Executor的newFixedThreadPool()方法:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

会发现他其实是返回了一个ThreadPoolExecutor对象,可以确定ThreadPoolExecutor就是线程池的实现类了,那ThreadPoolExecutor构造函数的这几个参数都是什么意思呢? 看一下ThreadPoolExecutor最丰富的构造,其他构造都是调用这个的:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

我们一个个来分析一下,会发现他的完善和强大:

  • int corePoolSize 指定了线程池核心(最少)线程数量
  • int maximumPoolSize 指定了线程池最大线程数量
  • long keepAliveTime 指定了线程存活时间,也就是当有线程被归还的时候,他的存活时间,毕竟线程一直运行着也是耗费资源
  • TimeUnit unit 线程存活时间单位
  • BlockingQueue<Runnable> workQueue 任务队列,保存被提交但未被执行的任务。如果线程池已满且所有线程都在执行任务,那么后来提交的任务就会暂时保存在这个队列中
  • ThreadFactory threadFactory 创建线程时用到的线程工厂类
  • RejectedExecutionHandler handler 拒绝策略。当任务队列workQueue也满了的时候,再有任务提交到线程池,要通过什么把他拒绝掉

一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。

# 1、workQueue

如果对BlockingQueue不熟悉,请参考https://my.oschina.net/lizaizhong/blog/1840206 (opens new window)。
这个workQueue有以下几种选择:

  • SynchronousQueue 这是一个比较特殊的队列,本身没有容量,来一个就得消费一个。可以看到Executors.newCachedThreadPool()就是使用的这个队列,因为newCachedThreadPool()中的maximumPoolSize为无限大
  • ArrayBlockingQueue 有界队列
  • LinkedBlockingQueue 无界队列
  • PriorityBlockingQueue 优先级队列

# 2、threadFactory

threadFactory顾名思义就是线程工厂。如果我们想对线程池中的线程进行一些自定义配置,那么可以重写ThreadFactory的newThread()方法:

ThreadFactory threadFactory = new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("myThread-"+thread.getId());
        thread.setDaemon(true);
        return thread;
    }
};

# 3、rejectedExecutionHandler

当线程池线程都被取出,并且任务队列里的任务也排满的时候,新进入的任务怎么办?只能被拒绝。
JDK提供了四种拒绝策略,分别如下:

  • AbortPolicy 该策略直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者(main)线程中运行当前被丢弃的任务。这种策略容易造成调用者线程的性能急剧下降
  • DiscardOldestPolicy 该策略丢弃任务队列中最老(最早)的一个请求,并尝试再次提交当前任务请求
  • DiscardPolicy 该策略默默丢弃无法处理的任务

# 4、扩展线程池

ThreadPoolExecutor的扩展机制类似与拦截器,但并未提供接口方法,而是需要重写这些方法,主要是三个方法:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES	, new SynchronousQueue<>()) {
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("准备执行");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("执行完成");
    }

    @Override
    protected void terminated() {
        System.out.println("线程池退出");
    }
};

# 四、不要使用Executors(What?)

如果仔细查看一下Executors的几个工厂方法,例如newSingleThreadExecutor():

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

会发现他返回的线程池,请求任务队列是一个无界队列,那么这样容易出现什么问题呢?想必你已经想到了,容易出现任务大量堆积,导致OOM。
其他的工厂方法也存在这个问题,总结起来就是:

  • fixedThreadPool和singleThreadPool请求任务队列为无界队列,容易OOM
  • cachedThreadPool和scheduledThreadPool创建线程的数量为Integer.MAX_VALUE,容易OOM

Executors的这个问题,在大型项目中尤其需要注意。所以根据前辈们的血泪史,我们得到的宝贵经验就是:不要使用Executors获得线程池,最好选择自己来构建。

# 五、submit提交任务无异常的问题

如果你用线程池提交了一个Runnable任务,例如:

Future f = executor.submit(()->{System.out.println(1/0);});

可以看到这个任务会抛出异常,但是submit方法并不会捕获这个异常,只有f.get()的时候会抛出异常。
而如果你使用execute()方法执行任务,则在新线程中会直接抛出异常。
sumbit()为何会如此不同,感兴趣的话,可以看FutureTask的run()方法即可了解原因。

# 六、线程池停止

创建了Executor之后,不手动停止,JVM不会自动退出的。
前面讲到shutdown和shutdownNow,它们的主要区别是:

  • shutdown 会等待线程池中已执行的任务执行完成后退出,且不再执行队列中等待的任务
  • shutdownNow 会强制停止线程池中已执行的任务,且不再执行队列中等待的任务

如果想要让所有任务(包括队列中)都执行完毕才停止线程池,那么需要变通的实现一下:

CountDownLatch latch = new CountDownLatch(taskNum);
threadPoolExecutor.execute(() -> {
    try {
        ...
    } catch (Exception e) {
        log.error("exec", e);
    } finally {
        latch.countDown();
    }
})
latch.await();
threadPoolExecutor.shutdownNow();
编辑 (opens new window)
#ThreadPoolExecutor
上次更新: 2023/06/14
Java并发-LockSupport线程阻塞工具类
Java并发-阻塞队列BlockingQueue

← Java并发-LockSupport线程阻塞工具类 Java并发-阻塞队列BlockingQueue→

最近更新
01
Spring Boot版本新特性
09-15
02
Spring框架版本新特性
09-01
03
Spring Boot开发初体验
08-15
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式