思考集结处

vuePress-theme-reco 思考集结处    2024
思考集结处 思考集结处

Choose mode

  • dark
  • auto
  • light
首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
author-avatar

思考集结处

43

文章

18

标签

首页
标签
分类
  • AI
  • Docker
  • 分布式事务
  • 文件存储
  • 框架
  • Spring
  • java
  • 其他
  • 搜索引擎
  • 源码
  • 网站
Java
网站
容器技术
搜索引擎
分布式事务
源码系列
框架系列
文件存储
AI
其他
GitHub
  • Java相关技术
  • 网络编程相关概念
  • 同步阻塞IO模型
  • 同步非阻塞IO模型
  • 多路复用IO模型
  • 异步IO模型
  • 多线程源码分析
  • 线程池
  • Spring 事件监听
  • Spring 重试机制
  • Spring 重试机制之listeners参数

线程池

vuePress-theme-reco 思考集结处    2024

线程池

思考集结处 2021-12-07 线程池

线程池的创建方式,以及自定义线程池和如何使用自定义的线程池

# java线程池

本篇我们对java的线程池做一个详细的总结,我们都知道,在需要用到多线程的业务的场景下,就需要用到我们的线程池,顾名思义:就是存放线程的一个容器。 线程在线程池中被创建出来,以供线程进行调度使用。

# 为什么需要线程池

我们都知道java中进行创建线程的方式有三种,继承Thread类,实现Runnable或者实现Callable接口,但是使用这些方式创建的线程会存在频繁创建、 销毁、线程上下文切换的问题,同时创建过多的线程可能引发资源耗尽的风险,所以我们很有必要将线程池引入,来管理线程任务。

# 线程池创建-

Java为我们提供了七种创建线程池的方式,即:使用Executors工厂类自动创建线程的六种方式和使用ThreadPoolExecutor类进行自定义线程池。 我们先来看使用Executors工厂类自动创建线程:

# newFixedThreadPool

  • 源码
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(),
        threadFactory);
}
1
2
3
4
5
6
7
8
9
10
11
12

说明:该线程池的核心线程和最大线程全是重用固定的,以共享的无界队列方式来运行这些线程,超出的线程会在队列中等待。 可以看到,给我们提供了俩个构造方法,因此我们知道threadFactory:线程工厂是我们可以自己定义的,其实线程的七大参数我们都能够进行收到定义,下文 我们手写线程池的时候会详细的介绍每一个参数。

# newCachedThreadPool

  • 源码
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        threadFactory);
}
1
2
3
4
5
6
7
8
9
10
11
12

说明:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。若创建过的线程可以使用,则重用那些线程

# newSingleThreadExecutor

  • 源码
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(),
        threadFactory));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

说明:只有一个线程的线程池,能够保证任务顺序执行

# newScheduledThreadPool

  • 源码
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
1
2
3
4
5
6
7
8

说明:定时的执行定时任务的线程池,维护一个延时队列

# newSingleThreadScheduledExecutor

  • 源码
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}
1
2
3
4
5
6
7
8
9

说明:创建一个可以调度命令的单线程执行程序,在给定的延迟后运行,或周期性地执行

# newWorkStealingPool

  • 源码
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null, true);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

说明:创建一个维护多个队列的线程池,避免线程的争用,实际的线程数可以动态的变化

# 自定义线程池

接下来我们使用ThreadPoolExecutor类进行自定义线程池。该类是线程池中最为重要的一个类。首先我们来看该类为我们提供的几个构造器。

# 构造器

  • 源码
public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

通过构造器源码可以看到我们自定义一个线程池最少需要五个参数,那么这些参数都代表什么意思,我们进行详细的解读。

# 参数

  • corePoolSize:核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务,当线程池中的线程数目达到核心线程数的时候,后面的任务就会存放到缓存队列中

  • maximumPoolSize:最大线程数,核心线程数的基础上可能会额外增加一些非核心线程,注意的只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)

  • keepAliveTime:线程存活时间,表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize

  • unit:keepAliveTime的时间单位

    TimeUnit.DAYS-天, TimeUnit.HOURS-小时, TimeUnit.MINUTES-分钟, TimeUnit.SECONDS-秒,TimeUnit.MILLISECONDS-毫秒,TimeUnit.MICROSECONDS-微妙,TimeUnit.NANOSECONDS-纳秒

  • workQueue:队列,用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中

    SynchronousQueue(同步移交队列):队列不作为任务的缓冲方式,可以简单理解为队列长度为零

    LinkedBlockingQueue(无界队列):队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOM

    ArrayBlockintQueue(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务

  • threadFactory:线程工厂,主要是创建线程,将现场进行命名分组

  • handler:策略,拒绝处理任务时的策略

    ThreadPoolExecutor.AbortPolicy-丢弃任务并抛出RejectedExecutionException异常;

    ThreadPoolExecutor.DiscardPolicy-丢弃任务,但是不抛出异常;

    ThreadPoolExecutor.DiscardOldestPolicy-丢弃队列最前面的任务,然后重新尝试执行任务;

    ThreadPoolExecutor.CallerRunsPolicy-由调用线程处理该任务

# 实现

  • 1.接下来我们使用ThreadPoolExecutor来手动实现一个线程池,首先我们将线程池需要的参数进行抽取到一个类ThreadPoolConfig里,源码如下:
/**
 * @author:triumphxx
 * @Date: 2021/12/7
 * @Time: 10:12 
 * @微信公众号:北漂码农有话说
 * @网站:https://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: 线程池相关参数配置
 **/
public class ThreadPoolConfig {
    /**核心线程数*/
    public static int corePoolSize = 20;
    /** 计算线程最大线程数 */
    public static int maximumPoolSize = 20;
    /** 线程最大存活时间 */
    public final static int keepAliveTime = 10;
    /** 程最大存活时间单位 */
    public final static TimeUnit timeUnit = TimeUnit.SECONDS;
    /** 线程队列深度 */
    public final static int threadQueueSize = 1000;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

如上代码我们将定义线程池的参数进行抽取封装。

  • 2、实现自己的线程工厂,源码如下
/**
 * @author:triumphxx
 * @Date: 2021/12/7
 * @Time: 10:26 
 * @微信公众号:北漂码农有话说
 * @网站:http://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: 线程工厂,将线程进行分组,给每一个线程起一个名称
 **/
public class ThreadPoolFactory  implements ThreadFactory {
    /** 线程池编号*/
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    /** 线程组*/
    private final ThreadGroup group;
    /** 线程编号*/
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    /** 线程名称前缀*/
    private final String namePrefix;
    /** 构造器*/
    public ThreadPoolFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        return t;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  • 3、实现线程池管理器,源码如下
/**
 * @author:triumphxx
 * @Date: 2021/12/7
 * @Time: 10:07 
 * @微信公众号:北漂码农有话说
 * @网站:https://blog.triumphxx.com.cn
 * @GitHub https://github.com/triumphxx
 * @Desc: 自己定义线程池
 **/
public  class  ThreadPoolManager {
    private static ThreadPoolExecutor executor = null;
    /**
     * 单例
      */
    private ThreadPoolManager(){
    }

    public static ThreadPoolExecutor getThreadPool(){
        //懒加载
        if (executor == null){
            executor = new ThreadPoolExecutor(
                    //核心线程数
                    ThreadPoolConfig.corePoolSize,
                    // 最大线程数据
                    ThreadPoolConfig.maximumPoolSize,
                    //线程存活时间
                    ThreadPoolConfig.keepAliveTime,
                    //线程存活时间单位
                    ThreadPoolConfig.timeUnit,
                    //线程队列
                    new LinkedBlockingQueue<>(ThreadPoolConfig.threadQueueSize),
                    //线程工厂
                    new ThreadPoolFactory(),
                    //达到线程边界和队列容量 线程阻塞时 执行的策略
                    new ThreadPoolExecutor.DiscardOldestPolicy()
            );
        }
        return executor;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 使用

将我们前面的同步阻塞IO模型 的服务端代码进行改造,操作如下: 将该部分代码在idea中执行,maven install名称,打成一个jar包放到本地仓库,在同步阻塞IO模型项目的pom.xml中引入

<dependencies>
    <dependency>
      <groupId>org.triumphxx</groupId>
      <artifactId>ThreadPool</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>
1
2
3
4
5
6
7

在程序中使用我们定义的线程池

public class BIOServer {

    public static void main(String[] args) throws IOException {
        //创建一个Socket服务
        ServerSocket serverSocket = new ServerSocket(9000);
        //创建线程池
//        ExecutorService executorService = Executors.newCachedThreadPool();
         ThreadPoolExecutor threadPool = ThreadPoolManager.getThreadPool();
        //服务端启动后等待着客户端的请求
        while (true){
            System.out.println("等待连接");
            //阻塞的方法,等待连接
            final Socket socket = serverSocket.accept();
            System.out.println("有客户端的连接进来了");
            //BIO的特点,每进来一个客户端的请求都需要创建线程去处理
//            executorService.execute(new Runnable() {
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    dealMessage(socket);
                }
            });
        }

    }

    public static void dealMessage(Socket socket) {
        System.out.println("线程名称为" + Thread.currentThread().getName());
        System.out.println("开始读取信息");
        InputStream inputStream = null;
        try {
            byte[] bytes = new byte[1024];
            inputStream = socket.getInputStream();
            inputStream.read(bytes);
            System.out.println("接收到客户端的信息为:"+new String(bytes));
            socket.getOutputStream().write("客户端你好,接收到了你的信息".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 小结

本篇文章,我们详细了解了线程池,已经线程池的创建方式,已经自定义线程池和如何使用自定义的线程池,本人的这个小程序,大家可以直接进行下载使用源码地址 , 希望对大家有所帮助

我是思考集结处欢迎你的关注
看板娘