Java并发-线程池ThreadPoolExecutor
# 一、何为线程池
如果你接触过对象池,比如数据库连接池,他们要解决的问题是:对象的创建和销毁耗费资源比较多,最好能做到创建一次,然后重复使用。我们平常用到static final
去修饰常量,也是这个意思。
对于线程来说,频繁创建和销毁线程会带来以下问题:
- 性能开销:线程的创建和销毁需要与操作系统交互,开销较大
- 内存消耗:线程数量过多会占用大量内存,增加GC压力
- 资源耗尽:系统对线程数量有限制,无限制创建可能导致系统资源耗尽
- 管理困难:大量线程难以统一管理和监控
线程池通过预先创建一定数量的线程并重复使用它们来解决这些问题。使用线程池后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。
# 二、先来讲讲Executors
与ExecutorService
JDK中自然提供了一套线程池的实现,核心实现类是ThreadPoolExecutor
。
# 1、线程池体系结构
Executor (接口)
↑
ExecutorService (接口)
↑
AbstractExecutorService (抽象类)
↑
ThreadPoolExecutor (实现类)
为了更简单的使用ThreadPoolExecutor
,JDK提供了线程池工厂类Executors
,它提供了以下工厂方法:
newFixedThreadPool(int nThreads)
- 构建一个拥有固定线程数量的线程池newSingleThreadExecutor()
- 构建一个只拥有一个线程的线程池newCachedThreadPool()
- 构建一个弹性的线程池,线程数量根据任务动态调整newSingleThreadScheduledExecutor()
- 构建一个只拥有一个线程的计划任务线程池newScheduledThreadPool(int corePoolSize)
- 构建一个拥有指定核心线程数的计划任务线程池newWorkStealingPool()
- 创建一个工作窃取线程池(JDK 1.8+)
我们先来看一个例子:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(()->{return 12;});
System.out.println(future.get());
executorService.shutdown();
我们看到了几个新的类,Executors
的工厂类方法会返回一个ExecutorService
,实际上他是ThreadPoolExecutor
的父接口。
ExecutorService
提供了submit()
方法,把一个Runnable
或Callback
提交到线程池去执行,这里的入参是Callback
,Callback
是一个回调接口,没有入参但有返回值。submit()
方法提交了一个任务之后,会返回一个Future
用来表示异步计算的结果,这里也就是获得Callback
所返回的结果。
那么如果submit()
入参是Runnable
呢,这样的话Future
一般只会获取到null
。
ExecutorService
是继承自Executor
接口,有时你不需要用Future
来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{});
executorService.shutdown();
需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()
方法,ExecutorService
还有一个shutdownNow()
方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。
# 2、ScheduledExecutorService
在上面我们看到Executors
的工厂方法中有两个会返回ScheduledExecutorService
,分别是newSingleThreadScheduledExecutor()
和newScheduledThreadPool(int corePoolSize)
,这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:
scheduleAtFixedRate()
以上个任务的执行时间为起点,之后的period时间,调度下一次任务scheduleWithFixedDelay()
以上个任务的结束时间为起点,经过delay时间进行任务调度
# 三、重头戏之ThreadPoolExecutor
Executors
的工厂方法比较合适初学者使用,简单直接。但在生产环境中,我们通常需要更精细的控制,这就需要深入了解ThreadPoolExecutor
的内部实现。
# 1、线程池的工作原理
在深入参数之前,先了解线程池的工作流程:
- 当提交任务时,如果线程池中的线程数小于
corePoolSize
,创建新线程执行任务 - 如果线程数达到
corePoolSize
,新任务会被放入工作队列 - 如果工作队列已满,且线程数小于
maximumPoolSize
,创建新线程执行任务 - 如果线程数达到
maximumPoolSize
且队列已满,执行拒绝策略
来看一下Executors
的newFixedThreadPool()
方法:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
会发现他其实是返回了一个ThreadPoolExecutor
对象,可以确定ThreadPoolExecutor
就是线程池的实现类了,那ThreadPoolExecutor
构造函数的这几个参数都是什么意思呢?
看一下ThreadPoolExecutor
最丰富的构造,其他构造都是调用这个的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
我们一个个来详细分析这些参数:
# 1.1、核心参数说明
int corePoolSize
- 核心线程数- 线程池中始终保持的最小线程数量
- 即使线程处于空闲状态也不会被回收(除非设置了
allowCoreThreadTimeOut
)
int maximumPoolSize
- 最大线程数- 线程池允许创建的最大线程数量
- 当工作队列满了之后,会创建新线程直到达到这个数量
long keepAliveTime
- 线程空闲时间- 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
- 超过这个时间,空闲线程会被回收
TimeUnit unit
- 时间单位keepAliveTime
的时间单位- 可选值:
NANOSECONDS
、MICROSECONDS
、MILLISECONDS
、SECONDS
、MINUTES
、HOURS
、DAYS
BlockingQueue<Runnable> workQueue
- 工作队列- 用于保存等待执行的任务的阻塞队列
- 当所有核心线程都在忙时,新任务会被放入这个队列
ThreadFactory threadFactory
- 线程工厂- 用于创建新线程的工厂
- 可以通过自定义线程工厂来设置线程名称、优先级、守护线程状态等
RejectedExecutionHandler handler
- 拒绝策略- 当队列和线程池都满了时,用于处理新提交任务的策略
一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。
# 2、workQueue - 工作队列详解
工作队列是线程池的核心组件之一,不同的队列类型会导致不同的线程池行为。如果对BlockingQueue
不熟悉,请参考Java并发-阻塞队列BlockingQueue。
常用的工作队列有以下几种:
# 2.1、SynchronousQueue
- 直接提交队列
- 特点:没有容量,每个插入操作必须等待对应的删除操作
- 使用场景:适合任务执行时间短且提交频率不高的场景
- 典型应用:
Executors.newCachedThreadPool()
使用此队列
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>())
# 2.2、ArrayBlockingQueue
- 有界队列
- 特点:基于数组的有界阻塞队列,FIFO顺序
- 使用场景:适合控制资源消耗,防止任务无限堆积
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100))
# 2.3、LinkedBlockingQueue
- 无界队列
- 特点:基于链表的阻塞队列,可以设置容量(默认
Integer.MAX_VALUE
) - 使用场景:适合任务提交速度时快时慢的场景
- 注意:无界队列可能导致OOM
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>())
# 2.4、PriorityBlockingQueue
- 优先级队列
- 特点:支持优先级排序的无界队列
- 使用场景:需要按任务优先级执行的场景
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>())
# 2.5、DelayQueue
- 延迟队列
- 特点:队列中的元素只有到达指定延迟时间后才能被获取
- 使用场景:定时任务、延迟任务
# 3、threadFactory - 线程工厂
ThreadFactory
用于创建新线程,通过自定义线程工厂,我们可以:
- 设置线程名称,便于调试和监控
- 设置线程优先级
- 设置守护线程状态
- 设置线程组
- 记录线程创建日志
# 3.1、基本使用示例
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("pool-custom-thread-" + threadNumber.getAndIncrement());
thread.setDaemon(false); // 非守护线程
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
# 3.2、使用Guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("task-pool-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.setUncaughtExceptionHandler((t, e) -> {
System.err.println("Thread " + t.getName() + " threw exception: " + e);
})
.build();
# 4、rejectedExecutionHandler - 拒绝策略
当线程池和任务队列都满了,新任务将被拒绝。选择合适的拒绝策略对系统稳定性至关重要。
# 4.1、JDK内置拒绝策略
# a、AbortPolicy
- 中止策略(默认)
- 行为:直接抛出
RejectedExecutionException
异常 - 使用场景:适合关键业务,不允许任务丢失
new ThreadPoolExecutor.AbortPolicy()
# b、CallerRunsPolicy
- 调用者运行策略
- 行为:在调用者线程中直接执行被拒绝的任务
- 优点:不会丢失任务,提供了一种简单的流控机制
- 缺点:可能导致调用者线程性能下降
new ThreadPoolExecutor.CallerRunsPolicy()
# c、DiscardOldestPolicy
- 丢弃最老任务策略
- 行为:丢弃队列中最老的任务,然后尝试重新提交当前任务
- 使用场景:适合对实时性要求高的场景
new ThreadPoolExecutor.DiscardOldestPolicy()
# d、DiscardPolicy
- 丢弃策略
- 行为:默默丢弃被拒绝的任务,不抛出异常
- 使用场景:适合允许任务丢失的场景
new ThreadPoolExecutor.DiscardPolicy()
# 4.2、自定义拒绝策略
RejectedExecutionHandler customHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
System.err.println("Task rejected: " + r.toString());
// 保存到数据库或消息队列
saveToDatabase(r);
// 或者创建新线程执行
new Thread(r).start();
}
};
# 5、扩展线程池
ThreadPoolExecutor
提供了钩子方法用于扩展,可以在任务执行前后进行自定义操作:
public class MonitoringThreadPool extends ThreadPoolExecutor {
private final Map<String, Long> startTimes = new ConcurrentHashMap<>();
public MonitoringThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTimes.put(String.valueOf(r.hashCode()), System.currentTimeMillis());
System.out.println("Thread " + t.getName() + " 开始执行任务");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Long startTime = startTimes.remove(String.valueOf(r.hashCode()));
if (startTime != null) {
long elapsed = System.currentTimeMillis() - startTime;
System.out.println("任务执行耗时: " + elapsed + " ms");
}
if (t != null) {
System.err.println("任务执行异常: " + t);
}
}
@Override
protected void terminated() {
super.terminated();
System.out.println("线程池关闭,总计执行任务数: " + getCompletedTaskCount());
}
}
# 四、生产环境最佳实践
# 1、为什么不推荐使用Executors?
阿里巴巴Java开发手册明确指出:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。
让我们分析一下Executors
工厂方法的问题:
# 1.1、newFixedThreadPool
和newSingleThreadExecutor
的问题
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); // 无界队列!
}
问题:使用无界队列LinkedBlockingQueue
,当任务提交速度超过处理速度时,队列会无限增长,最终导致OOM。
# 1.2、newCachedThreadPool
和newScheduledThreadPool
的问题
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 最大线程数无限!
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
问题:最大线程数为Integer.MAX_VALUE
,可能创建大量线程,导致OOM。
# 2、推荐的线程池配置方式
// 正确的创建方式
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 有界队列
new ThreadFactoryBuilder() // 自定义线程工厂
.setNameFormat("business-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
# 五、异常处理的注意事项
# 1、对比execute的异常处理差异
线程池提供了两种提交任务的方式,它们在异常处理上有重要区别:
# 1.1、execute()方法
executor.execute(() -> {
System.out.println(1/0); // 异常会直接抛出
});
- 异常会被直接抛出到线程的
UncaughtExceptionHandler
- 如果没有设置异常处理器,异常信息会打印到控制台
# 1.2、submit()方法
Future<?> future = executor.submit(() -> {
System.out.println(1/0); // 异常被封装在Future中
});
// 异常只有在调用get()时才会抛出
try {
future.get(); // 这里会抛出ExecutionException
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 获取真正的异常
}
- 异常会被封装在
Future
对象中 - 只有调用
Future.get()
时才会抛出ExecutionException
- 如果不调用
get()
,异常会被静默吞掉
# 2、最佳实践:统一的异常处理
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder()
.setNameFormat("task-pool-%d")
.setUncaughtExceptionHandler((t, e) -> {
System.err.println("Thread " + t.getName() + " threw exception: " + e);
// 记录日志、发送告警等
})
.build(),
new ThreadPoolExecutor.AbortPolicy()
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
// 处理execute()方法的异常
System.err.println("Task execution failed: " + t);
} else if (r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (ExecutionException ee) {
// 处理submit()方法的异常
System.err.println("Task execution failed: " + ee.getCause());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
};
# 六、线程池的关闭策略
# 1、为什么需要关闭线程池?
线程池中的核心线程默认会一直存活,即使没有任务执行。如果不手动关闭线程池,JVM将无法正常退出。
# 2、两种关闭方法的区别
# 2.1、shutdown()
- 优雅关闭
executor.shutdown();
- 不再接受新任务
- 等待已提交的任务执行完成(包括正在执行和队列中的任务)
- 不会中断正在执行的任务
# 2.2、shutdownNow()
- 立即关闭
List<Runnable> unfinishedTasks = executor.shutdownNow();
- 不再接受新任务
- 尝试中断正在执行的任务
- 返回队列中未执行的任务列表
- 不保证正在执行的任务能够停止
# 3、优雅关闭的最佳实践
public static void gracefulShutdown(ExecutorService executor, int timeout, TimeUnit unit) {
try {
// 1. 停止接收新任务
executor.shutdown();
// 2. 等待现有任务完成
if (!executor.awaitTermination(timeout, unit)) {
// 3. 超时后强制关闭
System.err.println("线程池未能在" + timeout + unit + "内完成,强制关闭");
List<Runnable> droppedTasks = executor.shutdownNow();
System.err.println("被丢弃的任务数: " + droppedTasks.size());
// 4. 再次等待任务响应中断
if (!executor.awaitTermination(timeout, unit)) {
System.err.println("线程池未能终止");
}
}
} catch (InterruptedException e) {
// 5. 当前线程被中断,立即关闭线程池
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
# 4、使用CountDownLatch确保所有任务完成
int taskCount = 100;
CountDownLatch latch = new CountDownLatch(taskCount);
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
for (int i = 0; i < taskCount; i++) {
executor.execute(() -> {
try {
// 执行任务
doWork();
} catch (Exception e) {
log.error("Task failed", e);
} finally {
latch.countDown(); // 无论成功失败都要计数
}
});
}
// 等待所有任务完成
latch.await();
// 关闭线程池
executor.shutdown();
# 七、线程池参数调优
# 1、如何合理配置线程池参数?
# 1.1、核心线程数和最大线程数的设置
根据任务类型来设置:
CPU密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCores + 1; // CPU核心数 + 1
IO密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCores * 2; // CPU核心数 * 2
// 或者使用公式:线程数 = CPU核心数 * (1 + 平均等待时间/平均工作时间)
# 1.2、队列容量的设置
// 根据任务量和内存限制来设置
int queueCapacity = 1000; // 需要根据实际情况调整
// 监控队列使用情况
int queueSize = executor.getQueue().size();
int remainingCapacity = executor.getQueue().remainingCapacity();
# 1.3、动态调整线程池参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
// 运行时动态调整
executor.setCorePoolSize(20);
executor.setMaximumPoolSize(50);
executor.setKeepAliveTime(30, TimeUnit.SECONDS);
# 2、线程池监控
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitorExecutor =
Executors.newSingleThreadScheduledExecutor();
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
}
public void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
System.out.println(String.format(
"Pool Size: %d, Active: %d, Completed: %d, Task: %d, Queue: %d",
executor.getPoolSize(),
executor.getActiveCount(),
executor.getCompletedTaskCount(),
executor.getTaskCount(),
executor.getQueue().size()
));
}, 0, 1, TimeUnit.SECONDS);
}
}
# 八、常见问题和解决方案
# 1、线程池死锁
问题:任务之间相互依赖,导致死锁
// 错误示例:可能导致死锁
Future<String> future1 = executor.submit(() -> {
Future<String> future2 = executor.submit(() -> "result2");
return future2.get(); // 等待另一个任务,可能死锁
});
解决方案:避免任务间依赖,或使用不同的线程池
# 2、任务执行时间过长
解决方案:设置任务超时
Future<String> future = executor.submit(callable);
try {
String result = future.get(5, TimeUnit.SECONDS); // 5秒超时
} catch (TimeoutException e) {
future.cancel(true); // 超时则取消任务
}
# 3、内存泄漏
问题:ThreadLocal变量未清理 解决方案:在任务结束时清理ThreadLocal
executor.execute(() -> {
try {
// 使用ThreadLocal
threadLocal.set(value);
// 执行任务
} finally {
threadLocal.remove(); // 清理ThreadLocal
}
});
# 九、总结
- 不要使用Executors创建线程池,而是直接使用
ThreadPoolExecutor
- 合理配置线程池参数,根据任务类型(CPU密集型/IO密集型)调整
- 正确处理异常,避免异常被吞掉
- 优雅关闭线程池,确保任务正确完成
- 监控线程池状态,及时发现和解决问题
- 避免任务间依赖,防止死锁
- 设置合理的拒绝策略,防止系统过载