线程池

来,随我吃透线程池!!!

线程池的作用

线程的创建和销毁的开销是非常大的,线程创建,直接依靠操作系统。大量的线程的创建,会给操作系统和jvm虚拟机带来压力,同时,大量的销毁也会给垃圾回收器带来压力

所以,线程池的目的就是为了解决两个问题

1反复创建线程开销大

2过多的线程太多占用内存

线程池:通过少量线程的复用,成功的解决了这个问题

线程池的好处:

加快响应速度,合理利用cpu与内存,统一管理线程这些线程。

适用线程池的场合

1服务器,服务器要收到大量请求,比如tomcat服务器,也是用线程池实现的

2开发中,5个以上的线程,就可用用线程池了

线程池的创建

  • 核心参数配置说明

参数

说明

corePoolSize

线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活

maximumPoolSize

最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务

keepAliveTime

当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize

TimeUnit

时间单位

workQueue

缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行

threadFactory

线程创建的工厂,一般用默认的 Executors.defaultThreadFactory()

handler

当pool已经达到max size的时候,如何处理新任务

这里我们先说一下线程创建的参数,

corePoolsize maxNumPoolSize worekQueue

线程创建,会先创建corePoolsize的线程数量,比如5,当阻塞队列满的时候,比如100,此时线程数量会变大,还处理不过来,就会到maxNumPoolSize最大线程数量,

注意点:弱corePoolSize与maxNumPoolSize保持一致,线程池的大小就是固定的

线程池期望保留线程更少一点,通过corePoolSize在阻塞队列满的时候,才会增加,可以看出来

两个特殊的场景,当阻塞队列大小设置成Integer.MaxValue,那么此时线程数量将不会大于corePoolSize

若maxNumPoolSize设置成Integer.MaxValue,那么此时策略是:允许线程池的容量无限扩大!

keepAliveTime,这里就是多余corePoolSize,大于核心线程数的数量的线程,空闲时间到达这个值 就会被回收

image-20230726145156983

线程工厂,ThreadFactory这个参数直到是创建线程的就可以了,源码这里也无非是new Thread

主要再将一下

工作队列,worKQueue

1)直接交接,SynchronousQueue

2)无界队列:LinkedBlockingQueue

当任务太多,处理不过来,就会导致OOM,内存溢出,而且会让任务丢失

这个队列maxPoolSize是无意义的,因为这个队列满不了

3)有界队列:ArrayBlockingQueue

这个队列是可以满的,maxPoolsize有意义

手动创建or自动创建

有人可能看到过alibaba编码规范

image-20230726145930194

自动创建的话,juc提供了

    • JUC包下的Executors工具类提供多种线程池

    线程池名称 说明 newFixedThreadPool 一个定长线程池,可控制线程最大并发数 newCachedThreadPool 一个可缓存线程池, newSingleThreadExecutor 一个单线程化的线程池,用唯一的工作线程来执行任务 newScheduledThreadPool 一个定长线程池,支持定时/周期性任务执行

newFixedThreadPool

看名字就能想到,线程池的数量是固定的,那么通过上面的分析,能猜到,核心线程数corePoolSize的数量和maxNumPoolSize最大线程数应该是一致的,

代码语言:javascript
复制
/**
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:ExecutorDemo
 * @Date:2023-07-26 15:03
 * @Filename:ExecutorDemo
 */
public class ExecutorDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}
image-20230726152736496

通过控制台,可以看出,就是编码时候 的4个线程数

代码语言:javascript
复制
*/
   public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }

构造函数中,核心线程数,最大线程数,都是传入的参数nThreads

线程存货时间keepAliveTime是0L,后面的单位就不说了,这个0L,就是,根本没有非核心线程的线程,所以这个参数没什么意义

LinkedBlockQueue,这个工作队列是无限长的,所以当任务量过大的时候,都堆积再linkedBlokQueue阻塞队列中,太多了,内存扛不住,就会出现OOM

代码语言:javascript
复制
package threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • @Author:Joseph

  • @bolg:https://li-huancheng.gitee.io/

  • @Package:threadPool

  • @Project:bing-fa-demo

  • @name:FixedThreadPoolExecutorOOM

  • @Date:2023-07-26 15:34

  • @Filename:FixedThreadPoolExecutorOOM
    */
    public class FixedThreadPoolExecutorOOM {

    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
    executorService.execute(new SubThread());
    }
    }

}
class SubThread implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

这是造成OOM的一个小demo,配置下jvm内存小一点就可以看到

newSingleThreadExecutor

这个就不掩饰了,就是暴漏的问题和上面一样,线程存货时间keepAliveLife是没意义的

然后核心线程数和最大线程数是1,阻塞队列采取的是无界队列LinkedBlokingQueue

这两个例子就是阿里巴巴编码规范中说的,大量请求堆积导致的OOM

再看下面两种,

先说下结果

下面两种是创建线程数量太多,大量的线程而导致的OOM

newCachedThreadPool

他的功能是缓存线程池,可回收多余的线程

这个通过上面参数的讲解,可以猜测到,工作队列一定不是无界队列LinkedBlockingQueue,不然线程数量是不会大于corePoolSize核心线程数的

他采用的是SynchronousQueue,直接交换队列,也就是说,这个队列存不了东西,任务直接走到线程去,然后线程不够的话,就直接创建

所以他会导致大量的线程导致oom,线程不用可回收,那么它的keepAliveTime一定配置了个值

现在看一下源码

代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程为0,最大线程无限,keepAliveTime最大存活时间60s,队列采用直接交换队列,队列不缓冲任务

和上面的分析一样

newScheduledThreadPool

支持定时,周期型的执行任务

代码语言:javascript
复制
**

  • @Author:Joseph
  • @bolg:https://li-huancheng.gitee.io/
  • @Package:threadPool
  • @Project:bing-fa-demo
  • @name:ScheduledThreadPool
  • @Date:2023-07-26 15:55
  • @Filename:ScheduledThreadPool
    /
    public class ScheduledThreadPool {
    public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
    //4s后再执行
    scheduledExecutorService.schedule(new Task(),4, TimeUnit.SECONDS);
    //以一定频率重复运行,最开始等1s钟,后面每3s的执行
    scheduledExecutorService.scheduleAtFixedRate(new Task1(),1,3,TimeUnit.SECONDS);
    }
    }
    class Task1 implements Runnable{
    @Override
    public void run() {
    System.out.println("test");
    }
    }

这里写了下常见 的使用方法

这个既然有线程过多的情况,那么它的maxNumPoolSize最大线程数一定是Integer.MaxValue

代码语言:javascript
复制
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

这里的话,有核心线程数,但是最大线程数是MaxValue,有配置活时间,主要是采用的是延迟工作队列。

这里又多看到一个工作队列

一共有

SynchronousQueue直接交接队列,特点是长度为0

LinkedBlockingQueue无界队列,特点是无界

ArrayBlockingQueue有界队列,特点是有范围

DelayedWorkQueue()延时工作队列,特点是能延迟处理任务

手动创建

看见,jdk提供的,都有bug,都会导致OOM,所以业务使用要自己配置

那么线程池的数量设置成多少何时?

cpu密集型与IO密集型

业务时加密,计算hash等,cpu密集型的话,就可以设置大小为cpu的1~2倍,8核cpu的话,就可以将核心线程数设置为8-16

数据库的读写,文件,网络id这样的,io密集型的话,因为cpu的速度比io快,所以可以将核心线程数设置的多一些,10倍也可以的,

因为80个很多都是在等待io的,所以这样的话才能更好的利用cpu

Brain Goetz大佬就给除了一个公式

线程数= cpu核心数(1+平均等待时间/平均工作时间),当然真实使用的话,还是压测真实环境。估测的话,这个公式就可以的

jdk常见的线程池

通过上面的选择手动创建or自动创建

我们来总结一下jdk提供的线程池

1fixedThreadPool,这种线程池coro和max线程数一致,固定下线程

2cacheThreadPool,这种就是可缓存线程,线程会自动回收

3ScheduleThreadPool.定期执行任务,定时的执行

4singleThreadThreadExcecutor,就是一个线程,

上面我已经讲的很详细了,只需要注意,

1,4会因任务在工作队列的堆积造成OOM

这两个队列的选择就是为了满足需求嘛,线程固定了,那么任务只能通过队列来堆积,所以采用LinedBlocking’queue

2\4会因为线程数量的过大,导致OOM

cachedThreadPool选择SynchronousQueue目的就是全给线程,不需要在队列中转,提高效率

secheduledThread这个的队列就不用说了吧,延时功能

jdk8新的线程池

workStealingPool

特点:子任务 窃取

这里这样理解,三个线程,各个有自己的队列,他们也有公共的队列,

第一个线程自己创建了3个子任务,执行,另外两个线程会帮线程1去执行

这样使用有两个注意,适用不加锁的场景,这样才能让别的线程帮忙执行 ,另外就是执行的顺序不保证,因为会窃取嘛

场景:比较少,

线程池的关闭

线程池的关闭也是有些讲究的!

shutDown

发出线程终止的命令,但是并非是马上的关闭,会等队列中的任务全搞完,就关闭,

同时不会去接受新的任务

isShutDown

判断是否shutDown

isTerminated

这个是判断是否真的关闭了,因为执行shutDown要等待全关掉

awaitTermination

等待一段时间后来判断是否真的关闭

shutDownNow

这个命令会马上关闭,正在执行的线程会中断,在队列中为被消费的,会返回

代码语言:javascript
复制
/**
  • @Author:Joseph
  • @bolg:https://li-huancheng.gitee.io/
  • @Package:threadPool
  • @Project:bing-fa-demo
  • @name:ShutDown
  • @Date:2023-07-28 15:59
  • @Filename:ShutDown
    */
    public class ShutDown {
    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 1000; i++) {
    executorService.execute(new ShutDownTask());
    }
    executorService.shutdown();
    }
    }
    class ShutDownTask implements Runnable{
    @Override
    public void run() {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
  • 这里就比较简单了,大家自己测试,不是本文的重点

    拒接策略

    这里回顾一下重要的参数

    最大线程数,核心线程数,最大存活时间,工作队列,没错,还有一个就是现在要讲的拒绝策略

    拒绝时机

    1,当Executior执行shutDown命令,就不会再接受新的任务了

    2、工作队列长度,或者最大线程数饱和

    四种拒绝策略

    AbortPolicy

    直接抛出异常

    DiscardPolicy

    丢弃新的任务,我们是不知道的!

    DiscardPolicyOldestPolicy

    这个是丢掉最老的任务。

    CallerRunsPolicy

    让提交任务的线程去执行,比如异步功能,想要让线程池去做,但是线程池饱和了,线程池说,我不做,你自己做,这个就是CallerRunsPolicy,这个就是不像被压榨的员工哈哈

    钩子函数

    我们想在每个任务执行前后做一些日志统计,等任务,比如暂停,实现一个暂停的线程池

    代码语言:javascript
    复制
    package threadPool;
    
    

    import java.util.concurrent.*;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;

    /**

    • 延时每个任务执行的前后都可以放钩子函数

    • @Author:Joseph

    • @bolg:https://li-huancheng.gitee.io/

    • @Package:threadPool

    • @Project:bing-fa-demo

    • @name:PauseableThreadPool

    • @Date:2023-07-28 16:31

    • @Filename:PauseableThreadPool
      */
      public class PauseableThreadPool extends ThreadPoolExecutor {

      private final ReentrantLock lock= new ReentrantLock();

      private Condition unpaused = lock.newCondition();

      private boolean isPaused;

      public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      }

      public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
      }

      public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
      }

      public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
      }

      @Override
      protected void beforeExecute(Thread t, Runnable r) {
      super.beforeExecute(t, r);
      lock.lock();
      try{
      while(isPaused){
      unpaused.await();
      }
      } catch (InterruptedException e) {
      e.printStackTrace();
      }finally {
      lock.unlock();
      }
      }

      private void pause(){
      lock.lock();
      try{
      isPaused = true;
      }finally {
      lock.unlock();
      }
      }

      public void resume(){
      lock.lock();
      try{
      isPaused=false;
      unpaused.signalAll();
      }finally {
      lock.unlock();
      }
      }

      public static void main(String[] args) throws InterruptedException {
      PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
      Runnable runnable = new Runnable() {
      @Override
      public void run() {
      System.out.println("我被执行");
      try {
      Thread.sleep(10);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      };
      for (int i = 0; i < 10000; i++) {
      pauseableThreadPool.execute(runnable);

       }
       Thread.sleep(1500);
       pauseableThreadPool.pause();
       System.out.println(&#34;线程池被暂停了&#34;);
       Thread.sleep(1500);
       pauseableThreadPool.resume();
       System.out.println(&#34;线程池恢复了&#34;);
      

      }
      }

    线程池实现源码

    上面了解了下线程池的使用注意点,现在看下线程池怎么实现的

    线程池的组成部分

    线程池管理器:创建、管理线程池

    工作线程:就是线程池中存在的线程

    任务队列:这个就是参数里重要之一的工作队列,因为并发嘛,多个线程去取线程,所以采用的阻塞队列blokingQueue

    任务接口(Task):这个就是线程池要执行的一个一个的任务

    Executor家族的区分

    ThreadPoolExecutor、ExecutorService、Executor、Executors

    image-20230728172158364

    这是他们的继承实现关系

    Executor

    代码语言:javascript
    复制
    public interface Executor {

    /**

    • Executes the given command at some time in the future. The command
    • may execute in a new thread, in a pooled thread, or in the calling
    • thread, at the discretion of the {@code Executor} implementation.
    • @param command the runnable task
    • @throws RejectedExecutionException if this task cannot be
    • accepted for execution
    • @throws NullPointerException if command is null
      */
      void execute(Runnable command);
      }

    Executor是一个顶级的接口,只有这一个execute的方法

    ExecutorService

    代码语言:javascript
    复制
    public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
    Future<T> submit(Callable<T> task);
    Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
    List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

    }

    ExecutorService继承了Executor,并新增了一些管理线程池的方法

    Executors

    一个工具类这里就不多说了,上面讲解的很透彻,只是调用创建线程池的构造函数,指定一些规则而已

    ThreadPoolExecutor

    这里就要着重讲一下了,这里他是ExecutorService的实现,返回值是ExecutorService

    也是我们开发人员调用这个构造函数自定义线程池使用的,文章末尾会带大家实操一下

    代码语言:javascript
    复制
    public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    private final Object poolSizeMonitor = new Object();
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private int queueCapacity = 2147483647;
    private boolean allowCoreThreadTimeOut = false;

    线程池如何实现线程的复用

    这是ThreadPoolExecutor中额execute方法

    org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

    代码语言:javascript
    复制
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();

        int c = ctl.get();
    	//线程不够,增加线程
        if (workerCountOf(c) &lt; corePoolSize) {
            //我们要看的就是wordker方法
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) &amp;&amp; workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) &amp;&amp; remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }</code></pre></div></div><p>好,看一下work中的runWorker方法</p><p>java.util.concurrent.ThreadPoolExecutor#runWorker</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
    	//注意这里的taskRunnablerun方法就是一个一个的类
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //主要看这里,如果task不为null,就去执行这里的逻辑,getTask就是从阻塞队列中拿出任务
            //while循环代表,这个work不会停止,执行完任务就会继续拿下一个任务去执行,这就实现了线程的复用
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &amp;&amp;
                      runStateAtLeast(ctl.get(), STOP))) &amp;&amp;
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        //这里就是调用runnablerun方法,
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }</code></pre></div></div><h3 id="fchf8" name="%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%9A%84%E7%8A%B6%E6%80%81">线程池的状态</h3><div class="table-wrapper"><table><thead><tr><th style="text-align:left"><div><div class="table-header"><p>状态</p></div></div></th><th style="text-align:left"><div><div class="table-header"><p>说明</p></div></div></th></tr></thead><tbody><tr><td style="text-align:left"><div><div class="table-cell"><p>RUNNING</p></div></div></td><td style="text-align:left"><div><div class="table-cell"><p>运行状态,能接受新提交的任务,并且也能处理阻塞队列中的任务</p></div></div></td></tr><tr><td style="text-align:left"><div><div class="table-cell"><p>SHUTDOWN</p></div></div></td><td style="text-align:left"><div><div class="table-cell"><p>关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中已保存的任务。</p></div></div></td></tr><tr><td style="text-align:left"><div><div class="table-cell"><p>STOP</p></div></div></td><td style="text-align:left"><div><div class="table-cell"><p>不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程</p></div></div></td></tr><tr><td style="text-align:left"><div><div class="table-cell"><p>TIDYING</p></div></div></td><td style="text-align:left"><div><div class="table-cell"><p>所有的任务都已终止了,workerCount (有效线程数)0</p></div></div></td></tr><tr><td style="text-align:left"><div><div class="table-cell"><p>TERMINATED</p></div></div></td><td style="text-align:left"><div><div class="table-cell"><p>terminated() 方法执行完后进入该状态</p></div></div></td></tr></tbody></table></div><p>通过上面线程关闭的学习,再结合这里看一下</p><p>shutDown会使Running状态到SHUTDOWN状态,线程池会拒绝新任务,但是还是会执行新的任务</p><p>shutDOwnNow让线程池从Running状态到STOP状态,此时拒绝新任务,同时也会中断正在执行的任务,同时会返回队列中的任务</p><p>当上面两种方法执行完,队列和工作线程都空时,会进入tidying状态,执行termined会进入TERMINATED关闭状态。</p><p>在这里站在上帝视角看下源码</p><p>无非就是</p><p>executeaddworker runworkergetTask方法</p><p>这里要先将一下,ctl这个</p><p>变量 <code>ctl</code>这个AtomicInteger包含两部分的信息,使用的是位运算的方式,相比于基本运算,速度快很多</p><ul class="ul-level-0"><li>运行状态 (runState) 3位保存</li><li>线程池内有效线程的数量 (workerCount),低29位保存</li></ul><p>下面是变量和比较重要的方法</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
    
     * RUNNING -&gt; SHUTDOWN
     *    On invocation of shutdown()
     * (RUNNING or SHUTDOWN) -&gt; STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -&gt; TIDYING
     *    When both queue and pool are empty
     * STOP -&gt; TIDYING
     *    When pool is empty
     * TIDYING -&gt; TERMINATED
     *    When the terminated() hook method has completed
     *
     */
    //int类型的数字,高3位表示线程池状态,低29位表示worker数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //Integer.SIZE`32,所以`COUNT_BITS`29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池允许的最大线程数, 1左移29位,然后减1,即为 2^29 - 1
    private static final int COUNT_MASK = (1 &lt;&lt; COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 &lt;&lt; COUNT_BITS;
    private static final int SHUTDOWN   =  0 &lt;&lt; COUNT_BITS;
    private static final int STOP       =  1 &lt;&lt; COUNT_BITS;
    private static final int TIDYING    =  2 &lt;&lt; COUNT_BITS;
    private static final int TERMINATED =  3 &lt;&lt; COUNT_BITS;
    
    //获取线程池状态
    private static int runStateOf(int c)     { return c &amp; ~COUNT_MASK; }
    //获取线程池worker数量
    private static int workerCountOf(int c)  { return c &amp; COUNT_MASK; }
    //根据线程池状态和线程池worker数量,生成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }</code></pre></div></div><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">//任务执行
    

    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    // 32位,高3位存储线程池状态,低29位存储活跃线程数
    int c = ctl.get();
    //判断工作线程小于核心线程,则创建新线程,true表示是核心线程数
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    //判断线程池是否运行,把任务放到队列里面去,返回boolean状态
    if (isRunning(c) && workQueue.offer(command)) {
    //再次获取值
    int recheck = ctl.get();
    //如果线程池已经终止,则移除任务,不在响应
    if (! isRunning(recheck) && remove(command))
    reject(command);
    //如果没有线程,则创建一个空的worker,会从队列获取任务执行
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    //队列满后,调用addWorker,创建非核心线程,参数是false,
    else if (!addWorker(command, false))
    //队列已满,创建非核心线程,失败则执行拒绝策略
    reject(command);
    }

    //用于向线程池中添加一个新的工作线程。如果线程池中的线程数量已经达到maximumPoolSize,则返回false;
    //如果线程池已经关闭,则返回false;否则,创建一个新的工作线程,并将其加入工作线程集合中
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
    // Check if queue empty only if necessary.
    if (runStateAtLeast(c, SHUTDOWN)
    && (runStateAtLeast(c, STOP)
    || firstTask != null
    || workQueue.isEmpty()))
    return false;

            for (;;) {
                //判断线程数,根据传进来参数判断是创建线程数最大值
                if (workerCountOf(c)
                    &gt;= ((core ? corePoolSize : maximumPoolSize) &amp; COUNT_MASK))
                    return false;
                    //增加worker数量成功,返回到上面的retry
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
    
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) &amp;&amp; firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s &gt; largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                //新创建的worker,然后立刻启动,立刻执行任务(不是从队列中获取)
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    //用于执行指定的工作线程,首先获取当前线程,然后不断从阻塞队列中取出任务并执行,直到从阻塞队列中取出null为止。
    //在每次执行任务之前,会调用beforeExecute()方法和afterExecute()方法,这两个方法可以由子类进行重写,以实现一些特定的功能。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        
        //一直循环判断,当前任务是否有,没的话getTask()从队列中获取任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &amp;&amp;
                      runStateAtLeast(ctl.get(), STOP))) &amp;&amp;
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    //用于从阻塞队列中获取一个任务,如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前
    private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
    
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                &amp;&amp; (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // timed用于超时控制,当allowCoreThreadTimeOut是true或者活跃线程数大于核心线程数,则需要进行超时控制
            boolean timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;
    
            if ((wc &gt; maximumPoolSize || (timed &amp;&amp; timedOut))
                &amp;&amp; (wc &gt; 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //take和poll都是从队列头部【拿出】一个元素,从头部得到并移除该元素
            //poll空队列的头部元素时返回null,不抛异常;而take方法对应获得空队列的头部元素时,会阻塞在获取的位置   
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }</code></pre></div></div><p>关于源码的解读,大家看注释就可以</p><p>我想着重点一下,这个阻塞队列,take方法,这里就体现阻塞队列的特性了,take方法,不会返回元素,而是阻塞,</p><p>还有就是work类中的works,本质是一个HashSet集合</p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:83.93%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1722963925354679855.png" /></div><div class="figure-desc">image-20230728203619289</div></div></div></figure><p>再者需要注意runworker中执行前后的两个钩子</p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:83.93%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1722963925756821685.png" /></div><div class="figure-desc">image-20230728203836873</div></div></div></figure><p>前面提到,核心线程数是不会消失的,但是,这里指的是永远是数量,HashSet这里并没有区分线程是先创建的还是后创建的,</p><p>所以core核心线程数为3,假设有6个,剩下来的3个不一定是最初创建的线程!</p><h2 id="bl749" name="%E5%AE%9E%E6%88%98%E4%B8%BA%E7%8E%8B">实战为王</h2><p>这里讲一下工作中,如何创建线程池</p><p>先看一下,使用线程池的注意点</p><p>避免任务堆积</p><p>避免线程数过度增加</p><p>排查线程泄漏(线程回收不了的情况):一般是任务逻辑问题,导致任务结束不了,导致任务回收不了</p><p>如果有人问:工作中线程池是怎样做的?我认为可以这样回答</p><p>线程数的配置,要考虑io密集型和cpu密集型,io密集型可以设置2倍的核数,cpu密集型最好是和核数相等</p><p>阻塞队列的长度可以区分面向C端的快速响应还是面向B端的允许慢速处理的场景</p><p>C端的阻塞队列长度不能太长,另外C端,可以通过多节点,来增快消费速度,避免堆积,B端可以长一些</p><p>当然要结合业务,进行多接口压测,才能得到合理的数据。</p><p>这里列举两个场景。(以32核,64g为例)</p><p>C端:</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 32, 3,
       TimeUnit.SECONDS, new ArrayBlockingQueue&lt;Runnable&gt;(100000),
        new ThreadPoolExecutor.DiscardPolicy());</code></pre></div></div><p>这个C端的场景,就没有考虑上面说的,阻塞队列太长了,而核心线程数有过短,所以这里,就会出现问题,任务堆积而得不到消费</p><p>C端的场景不应该设置过长的策略,且要注意消费的速度要大一些</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,128, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue&lt;Runnable&gt;(1000),
        new ThreadPoolExecutor.DiscardPolicy());</code></pre></div></div><p>一台机器不够,那就多一些</p><p>B端</p><p>商家管理后台统计报表</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,1024, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue&lt;Runnable&gt;(1000),
        new ThreadPoolExecutor.DiscardPolicy());</code></pre></div></div><p>这里没有考虑B端场景,队列可以长一些,因为不要求实时性,这样配置,1024线程,导致OOM,服务器崩溃</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,124, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue&lt;Runnable&gt;(100000),
        new ThreadPoolExecutor.DiscardPolicy());</code></pre></div></div><p>这样配置,让任务能过慢慢处理,同时队列又不是长的很过分,100万,并不会出现OOM的问题</p><p>另外</p><p>生产配置的时候,要线程池的隔离,不能让多个不同任务公用线程池,</p><h2 id="advmj" name="%E5%BC%82%E6%AD%A5%E6%8F%90%E9%AB%98qps">异步提高qps</h2><p>到这里,线程池基本就完结了</p><p>这里提供之前做项目的一个场景,这里用到了http连接池,和线程池,通过这个例子,我们看一下,工作中,如何用线程池,以及考虑引发的一些问题,</p><h3 id="a8hke" name="%E5%BC%82%E6%AD%A5%E7%9A%84%E4%BD%BF%E7%94%A8%E5%9C%BA%E6%99%AF%E4%BB%A5%E5%8F%8A%E4%B8%80%E4%BA%9B%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9">异步的使用场景以及一些注意事项</h3><p>涉及网络通信,且有需要提高响应速度,就可以用异步,</p><p>MQ也属于一种异步,</p><p>适用于处理log,发送邮件,短信–等场景,涉及网络io调用,并不影响业务,因为异步是直接返回正常的,不知道后面是否成功</p><p>使用方式:启动类,开启异步@EnableAsync </p><p>异步失效场景:</p><p>异步采用动态代理,不能调用类本身的方法,加@Async,</p><ul class="ul-level-0"><li>注解@Async的方法不是public方法</li><li>注解@Async的返回值只能为void或者Future</li><li>注解@Async方法使用static修饰也会失效</li></ul><p>还要注意事物,@Tranctional与@Async会失效,但是,在调用异步的上游,就没事</p><h3 id="fkaen" name="%E7%BA%BF%E7%A8%8B%E6%B1%A0">线程池</h3><p>当我们使用@Async的时候,没有自定义线程池,他会使用默认的线程池</p><p>默认8个核心线程数,核心线程处理不了,就会进入阻塞队列,阻塞队列是默认的Integer.MAX_VALUE,21亿!采用的是LinkedBlokingQueue,最大线程数也是21亿,但是根本不会大于8,因为阻塞队列满了才会到达,但是不可能满的</p><p>所以就要自定义线程池,</p><p>但是!</p><p>在spring中,自定义线程池要用ThreadPoolTaskExecutor,其实是一样的,只是框架多了层包装而已</p><p>我们只需要自定义线程池,覆盖spring默认的线程池,就能避免OOM的问题了</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">Configuration
    

    @EnableAsync
    public class ThreadPoolTaskConfig {

    @Bean(&#34;threadPoolTaskExecutor&#34;)
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        executor.setCorePoolSize(16);
    

    // executor.setAllowCoreThreadTimeOut(true);

        //缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(1024);
    
        //最大线程池数量,当线程数&gt;=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);
    
        executor.setThreadNamePrefix(&#34;joseph的自定义线程池&#34;);
    
    
        //拒绝策略
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
        //AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        //DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        //DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    
        executor.initialize();
    
        return executor;
    
    }
    

    }

    好,我们异步的时候

    直接指定线程池

    @Async(“threadPoolTaskExecutor”)就可以了

    好,那么当请求量很多的时候,就会引发问题

    导致OOM,服务器重启,是会丢失的!,或者说,阻塞队列你设置的短一些的话,任务也是会丢失的(拒绝策略的话)

    意思就是,我们不能单单的把任务堆到队列就好了,是会有消息丢失的风险的!

    对的,就是提高线程处理任务的速度!

    消费方角度提高

    当涉及网络请求时,spring中使用的是restTempate

    这是spring基于httpClient提供的一个http请求工具,我们异步发短信调用第三方服务,用到restTemplate,但是restTemplate在spring中有个坑,

    • 底层通过使用java.net包下的实现创建HTTP 请求
    • 通过使用ClientHttpRequestFactory指定不同的HTTP请求方式,主要提供了两种实现方式
      • SimpleClientHttpRequestFactory(默认)
        • 底层使用J2SE提供的方式,既java.net包提供的方式,创建底层的Http请求连接
        • 主要createRequest 方法( 断点调试),每次都会创建一个新的连接,每次都创建连接会造成极大的资源浪费,而且若连接不能及时释放,会因为无法建立新的连接导致后面的请求阻塞
      • HttpComponentsClientHttpRequestFactory
        • 底层使用HttpClient访问远程的Http服务

    这就导致,每次请求都会三次握手,非常的耗时,

    当请求处理不过来,客户端等待过长,主动断掉连接,就会报错

    错误Caused by: java.io.IOException: Broken pipe

    • 服务端向前端socket连接管道写返回数据时 链接(pipe)却断开了

    我们可以通过http连接池的方式,来服用建立 的连接,来加快线程处理的速度

    进而加快阻塞队列的消费

    对httpclient进行封装的有:Apache的Fluent、es的restHighLevelClient、spring的restTemplate等

    这几个都可以进行http连接池的封装!

    下面来看一下,如何封装

    代码语言:javascript
    复制
    @Configuration
    public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
        return new RestTemplate(requestFactory);
    }
    
    @Bean
    public ClientHttpRequestFactory httpRequestFactory(){
        return new HttpComponentsClientHttpRequestFactory(httpClient());
    }
    
    @Bean
    public HttpClient httpClient(){
        Registry&lt;ConnectionSocketFactory&gt; registry = RegistryBuilder.&lt;ConnectionSocketFactory&gt;create()
                .register(&#34;http&#34;, PlainConnectionSocketFactory.getSocketFactory())
                .register(&#34;https&#34;, SSLConnectionSocketFactory.getSocketFactory())
                .build();
    
        PoolingHttpClientConnectionManager connectManager = new PoolingHttpClientConnectionManager(registry);
    
        //设置连接池最大是500个链接
        connectManager.setMaxTotal(500);
    
        //maxPerToute对maxTotal细分 每个主机最大并发是300  route是指域名
        connectManager.setDefaultMaxPerRoute(300);
    
        RequestConfig requestConfig = RequestConfig.custom()
                //返回数据超时时间
                .setSocketTimeout(20000)
                //连胜服务器超时时间
                .setConnectTimeout(10000)
                //从连接池中获取连接的超时时间
                .setConnectionRequestTimeout(1000)
                .build();
    
        CloseableHttpClient closeableHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
                .setConnectionManager(connectManager)
                .build();
        return closeableHttpClient;
    }
    //优化前
    

    // @Bean
    // public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
    // SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
    // factory.setReadTimeout(10000);
    // factory.setConnectTimeout(10000);
    // return factory;
    // }
    }

    现在,要涉及网络调用,异步提高性能的话

    就没有任何问题了!

    image-20230728224819957

    总结就是,先发现异步请求在压测的时候,会导致OOM,然后再自定义线程池,但是处理太慢,有超时报错和http管道报错,我们又更换httpClient,自定义http连接池,最终完成了qps的提高