线程池
线程池的创建方式,以及自定义线程池和如何使用自定义的线程池
# java线程池
本篇我们对java
的线程池做一个详细的总结,我们都知道,在需要用到多线程的业务的场景下,就需要用到我们的线程池,顾名思义:就是存放线程的一个容器。
线程在线程池中被创建出来,以供线程进行调度使用。
# 为什么需要线程池
我们都知道java
中进行创建线程的方式有三种,继承Thread
类,实现Runnable
或者实现Callable
接口,但是使用这些方式创建的线程会存在频繁创建、
销毁、线程上下文切换的问题,同时创建过多的线程可能引发资源耗尽的风险,所以我们很有必要将线程池引入,来管理线程任务。
# 线程池创建-
Java
为我们提供了七种创建线程池的方式,即:使用Executors
工厂类自动创建线程的六种方式和使用ThreadPoolExecutor
类进行自定义线程池。
我们先来看使用Executors
工厂类自动创建线程:
# newFixedThreadPool
- 源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
2
3
4
5
6
7
8
9
10
11
12
说明:该线程池的核心线程和最大线程全是重用固定的,以共享的无界队列方式来运行这些线程,超出的线程会在队列中等待。 可以看到,给我们提供了俩个构造方法,因此我们知道
threadFactory
:线程工厂是我们可以自己定义的,其实线程的七大参数我们都能够进行收到定义,下文 我们手写线程池的时候会详细的介绍每一个参数。
# newCachedThreadPool
- 源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2
3
4
5
6
7
8
9
10
11
12
说明:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。若创建过的线程可以使用,则重用那些线程
# newSingleThreadExecutor
- 源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
说明:只有一个线程的线程池,能够保证任务顺序执行
# newScheduledThreadPool
- 源码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
2
3
4
5
6
7
8
说明:定时的执行定时任务的线程池,维护一个延时队列
# newSingleThreadScheduledExecutor
- 源码
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
2
3
4
5
6
7
8
9
说明:创建一个可以调度命令的单线程执行程序,在给定的延迟后运行,或周期性地执行
# newWorkStealingPool
- 源码
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
2
3
4
5
6
7
8
9
10
11
12
13
说明:创建一个维护多个队列的线程池,避免线程的争用,实际的线程数可以动态的变化
# 自定义线程池
接下来我们使用ThreadPoolExecutor
类进行自定义线程池。该类是线程池中最为重要的一个类。首先我们来看该类为我们提供的几个构造器。
# 构造器
- 源码
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
通过构造器源码可以看到我们自定义一个线程池最少需要五个参数,那么这些参数都代表什么意思,我们进行详细的解读。
# 参数
corePoolSize
:核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务,当线程池中的线程数目达到核心线程数的时候,后面的任务就会存放到缓存队列中maximumPoolSize
:最大线程数,核心线程数的基础上可能会额外增加一些非核心线程,注意的只有当workQueue
队列填满时才会创建多于corePoolSize
的线程(线程池总线程数不超过maxPoolSize
)keepAliveTime
:线程存活时间,表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize
时,keepAliveTime
才会起作用,直到线程池中的线程数不大于corePoolSize
,即当线程池中的线程数大于corePoolSize
时,如果一个线程空闲的时间达到keepAliveTime
,则会终止,直到线程池中的线程数不超过corePoolSize
unit
:keepAliveTime
的时间单位TimeUnit.DAYS-天, TimeUnit.HOURS-小时, TimeUnit.MINUTES-分钟, TimeUnit.SECONDS-秒,TimeUnit.MILLISECONDS-毫秒,TimeUnit.MICROSECONDS-微妙,TimeUnit.NANOSECONDS-纳秒
workQueue
:队列,用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize
时,这时新进来的任务会被放到队列中SynchronousQueue(同步移交队列):队列不作为任务的缓冲方式,可以简单理解为队列长度为零
LinkedBlockingQueue(无界队列):队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOM
ArrayBlockintQueue(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务
threadFactory
:线程工厂,主要是创建线程,将现场进行命名分组handler
:策略,拒绝处理任务时的策略ThreadPoolExecutor.AbortPolicy-丢弃任务并抛出RejectedExecutionException异常;
ThreadPoolExecutor.DiscardPolicy-丢弃任务,但是不抛出异常;
ThreadPoolExecutor.DiscardOldestPolicy-丢弃队列最前面的任务,然后重新尝试执行任务;
ThreadPoolExecutor.CallerRunsPolicy-由调用线程处理该任务
# 实现
- 1.接下来我们使用
ThreadPoolExecutor
来手动实现一个线程池,首先我们将线程池需要的参数进行抽取到一个类ThreadPoolConfig
里,源码如下:
/**
* @author:triumphxx
* @Date: 2021/12/7
* @Time: 10:12
* @微信公众号:北漂码农有话说
* @网站:https://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc: 线程池相关参数配置
**/
public class ThreadPoolConfig {
/**核心线程数*/
public static int corePoolSize = 20;
/** 计算线程最大线程数 */
public static int maximumPoolSize = 20;
/** 线程最大存活时间 */
public final static int keepAliveTime = 10;
/** 程最大存活时间单位 */
public final static TimeUnit timeUnit = TimeUnit.SECONDS;
/** 线程队列深度 */
public final static int threadQueueSize = 1000;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
如上代码我们将定义线程池的参数进行抽取封装。
- 2、实现自己的线程工厂,源码如下
/**
* @author:triumphxx
* @Date: 2021/12/7
* @Time: 10:26
* @微信公众号:北漂码农有话说
* @网站:http://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc: 线程工厂,将线程进行分组,给每一个线程起一个名称
**/
public class ThreadPoolFactory implements ThreadFactory {
/** 线程池编号*/
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/** 线程组*/
private final ThreadGroup group;
/** 线程编号*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
/** 线程名称前缀*/
private final String namePrefix;
/** 构造器*/
public ThreadPoolFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
return t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- 3、实现线程池管理器,源码如下
/**
* @author:triumphxx
* @Date: 2021/12/7
* @Time: 10:07
* @微信公众号:北漂码农有话说
* @网站:https://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc: 自己定义线程池
**/
public class ThreadPoolManager {
private static ThreadPoolExecutor executor = null;
/**
* 单例
*/
private ThreadPoolManager(){
}
public static ThreadPoolExecutor getThreadPool(){
//懒加载
if (executor == null){
executor = new ThreadPoolExecutor(
//核心线程数
ThreadPoolConfig.corePoolSize,
// 最大线程数据
ThreadPoolConfig.maximumPoolSize,
//线程存活时间
ThreadPoolConfig.keepAliveTime,
//线程存活时间单位
ThreadPoolConfig.timeUnit,
//线程队列
new LinkedBlockingQueue<>(ThreadPoolConfig.threadQueueSize),
//线程工厂
new ThreadPoolFactory(),
//达到线程边界和队列容量 线程阻塞时 执行的策略
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
return executor;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 使用
将我们前面的同步阻塞IO模型的服务端代码进行改造,操作如下:
将该部分代码在idea
中执行,maven install
名称,打成一个jar
包放到本地仓库,在同步阻塞IO模型项目的pom.xml
中引入
<dependencies>
<dependency>
<groupId>org.triumphxx</groupId>
<artifactId>ThreadPool</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
2
3
4
5
6
7
在程序中使用我们定义的线程池
public class BIOServer {
public static void main(String[] args) throws IOException {
//创建一个Socket服务
ServerSocket serverSocket = new ServerSocket(9000);
//创建线程池
// ExecutorService executorService = Executors.newCachedThreadPool();
ThreadPoolExecutor threadPool = ThreadPoolManager.getThreadPool();
//服务端启动后等待着客户端的请求
while (true){
System.out.println("等待连接");
//阻塞的方法,等待连接
final Socket socket = serverSocket.accept();
System.out.println("有客户端的连接进来了");
//BIO的特点,每进来一个客户端的请求都需要创建线程去处理
// executorService.execute(new Runnable() {
threadPool.execute(new Runnable(){
@Override
public void run() {
dealMessage(socket);
}
});
}
}
public static void dealMessage(Socket socket) {
System.out.println("线程名称为" + Thread.currentThread().getName());
System.out.println("开始读取信息");
InputStream inputStream = null;
try {
byte[] bytes = new byte[1024];
inputStream = socket.getInputStream();
inputStream.read(bytes);
System.out.println("接收到客户端的信息为:"+new String(bytes));
socket.getOutputStream().write("客户端你好,接收到了你的信息".getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 小结
本篇文章,我们详细了解了线程池,已经线程池的创建方式,已经自定义线程池和如何使用自定义的线程池,本人的这个小程序,大家可以直接进行下载使用源码地址, 希望对大家有所帮助