跳至主要內容

JUC 八股22 - 线程池3

codejavajuc八股约 1827 字大约 6 分钟

线程池

IO 密集 / CPU 密集

任务类型线程状态CPU 使用多线程效果
CPU 密集型RUNNABLE可能更慢(切换开销)
IO 密集型WAITING更快(等待时间重叠)

IO 操作时线程在等待,CPU 空闲

// 读文件的过程
String data = readFile();  // 线程状态:WAITING
                           // CPU:几乎不占用
                           // 时间:大部分在等磁盘

思考

数据读取的过程

磁盘/网卡自己干活,不需要 CPU 参与:

线程发起读取请求
       ↓
CPU 发指令给磁盘控制器,然后线程进入等待
       ↓
磁盘控制器(DMA)把数据读到内存缓冲区   ← 不占用 CPU
       ↓
磁盘控制器通知 CPU:读完了
       ↓
线程被唤醒,数据已经在内存里了
// Java 代码
FileInputStream fis = new FileInputStream("data.txt");
byte[] data = new byte[1024];
fis.read(data);  // 线程等待,但不是 CPU 在读

// 实际过程:
// 1. CPU 发指令给磁盘控制器
// 2. CPU 切换去执行其他线程
// 3. 磁盘控制器通过 DMA 把数据直接写入内存
// 4. 完成后中断通知 CPU
// 5. 线程被唤醒,data 数组里已经有数据了

CPU 只需要发完这个指令,表示我要读取数据了,就可以切换到别的线程

那我写入了内存,但是线程此时被CPU切换掉了,等轮到他的时候,怎么知道数据咋样了

线程不会"检查"数据,是被操作系统"唤醒"的

整个流程:

1. 线程发起 read() 请求
       ↓
2. 线程状态变为 WAITING,被移出 CPU 调度队列
       ↓
3. CPU 切换到其他线程
       ↓
4. DMA 把数据写入内存缓冲区
       ↓
5. 磁盘控制器发送"中断信号"给 CPU
       ↓
6. 操作系统把线程状态改为 RUNNABLE,放回调度队列
       ↓
7. CPU 调度到这个线程,read() 返回,数据已经在变量里

线程池调优

首先我会根据任务类型设置核心线程数参数

比如 IO 密集型任务会设置为 CPU 核心数*2, CPU 密集型任务则设置为 CPU 核心数 + 1

其次我会结合线程池动态调整的能力,在流量波动时通过 setCorePoolSize 平滑扩容

结合监控或定时任务自动调整,通过监控指标(队列长度、活跃线程数)判断是否需要调整,然后调用 setCorePoolSize

比如如果工作队列挤压严重,那么就扩容,并且限制最大容量;

如果工作队列美元四超过五分钟就缩小容量

或者直接使用 DynamicTp 实现线程池参数的自动化调整

最后,我会通过内置的监控指标建立容量预警机制。比如通过 JMX 监控线程池的运行状态,设置阈值,当线程池的任务队列长度超过阈值时,触发告警

如何知道你设置的线程数多了还是少了

可以通过监控和调试来判断线程数是多还是少

通过 top 命令观察 CPU 的使用率,如果 CPU 使用率较低,可能是线程数过少;如果 CPU 使用率接近 100%,但吞吐量未提升,可能是线程数过多

然后再通过 VisualVM 或 Arthas 分析线程运行情况,查看线程的状态、等待时间、运行时间等信息

VisualVM: JDK 自带的图形化监控工具,可以看线程状态 jvisualvm

Arthas: 阿里开源的在线诊断工具,不用重启应用

可以使用 jstack 命令查看线程堆栈信息,查看线程是否处于阻塞状态

如果有大量的 BLOCKED 线程,说明线程数可能过多,竞争比较激烈

线程池如何实现参数的动态修改

线程池提供的 setter 方法就可以在运行时动态修改参数,比如说 setCorePoolSize 可以用来修改核心线程数、setMaximumPoolSize 可以用来修改最大线程数

需要注意的是,调用 setCorePoolSize() 时如果新的核心线程数比原来的大,线程池会创建新的线程;如果更小,线程池不会立即销毁多余的线程,除非有空闲线程超过 keepAliveTim

当然了,还可以利用 Nacos 配置中心,或者实现自定义的线程池,监听参数变化去动态调整参数

线程设计的注意点

第一个,选择合适的线程池大小。过小的线程池可能会导致任务一直在排队;过大的线程池可能会导致大家都在竞争 CPU 资源,增加上下文切换的开销

第二个,选择合适的任务队列。使用有界队列可以避免资源耗尽的风险,但是可能会导致任务被拒绝;使用无界队列虽然可以避免任务被拒绝,但是可能会导致内存耗尽

比如在使用 LinkedBlockingQueue 的时候,可以传入参数来限制队列中任务的数量,这样就不会出现 OOM

第三个,尽量使用自定义的线程池,而不是使用 Executors 创建的线程池。

因为 newFixedThreadPool 线程池由于使用了 LinkedBlockingQueue,队列的容量默认无限大,任务过多时会导致内存溢出;newCachedThreadPool 线程池由于核心线程数无限大,当任务过多的时候会导致创建大量的线程,导致服务器负载过高宕机。

手动设计线程池

线程池的主要目的是为了避免频繁地创建和销毁线程,池化

把线程池看作一个工厂,里面有一群“工人”,也就是线程了,专门用来做任务。

当任务来了,需要先判断有没有空闲的工人,如果有就把任务交给他们;如果没有,就把任务暂存到一个任务队列里,等工人忙完了再去处理。

如果队列满了,还没有空闲的工人,就要考虑扩容,让预备的工人过来干活,但不能超过预定的最大值,防止工厂被挤爆。

如果连扩容也没法解决,就需要一个拒绝策略,可能直接拒绝任务或者报个错。

class CustomThreadPoolExecutor {
  private final int corePoolSize;
  private final int maximumPoolSize;
  private final long keepAliveTime;
  private final TimeUnit unit;
  private final BlockingQueue<Runnable> workQueue;
  private final RejectedExecutionHandler handler;

  private volatile boolean isShutdown = false;
  private int currentPoolSize = 0;

  // 构造方法
  public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.keepAliveTime = keepAliveTime;
      this.unit = unit;
      this.workQueue = workQueue;
      this.handler = handler;
  }

  // 提交任务
  public void execute(Runnable task) {
      if (isShutdown) {
          throw new IllegalStateException("ThreadPool is shutdown");
      }

      synchronized (this) {
          // 如果当前线程数小于核心线程数,直接创建新线程
          if (currentPoolSize < corePoolSize) {
              new Worker(task).start();
              currentPoolSize++;
              return;
          }

          // 尝试将任务添加到队列中
          if (!workQueue.offer(task)) {
              if (currentPoolSize < maximumPoolSize) {
                  new Worker(task).start();
                  currentPoolSize++;
              } else {
                  // 调用拒绝策略
                  handler.rejectedExecution(task, null);
              }
          }
      }
  }

  // 关闭线程池
  public void shutdown() {
      isShutdown = true;
  }

  // 工作线程
  private class Worker extends Thread {
      private Runnable task;

      Worker(Runnable task) {
          this.task = task;
      }

      @Override
      public void run() {
          while (task != null || (task = getTask()) != null) {
              try {
                  task.run();
              } finally {
                  task = null;
              }
          }
      }

      // 从队列中获取任务
      private Runnable getTask() {
          try {
              return workQueue.poll(keepAliveTime, unit);
          } catch (InterruptedException e) {
              return null;
          }
      }
  }
}
上次编辑于: