跳至主要內容

JUC10 - 线程池2

codejavajuc约 3222 字大约 11 分钟

线程池2

使用 Executors 创建线程池 (工具类 Executors)

除了我们自己创建线程池之外,官方也提供了很多的线程池定义

可以使用Executors工具类来快速创建线程池

固定线程量的线程池 - newFixedThreadPool()

固定线程量的线程池,本质就是一个 ThreadPoolExecutor() 方法

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(2);   
    // 直接创建一个固定容量的线程池
}

对应的内部实现为:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这里直接将最大线程和核心线程数量设定为一样的

并且等待时间为0,因为压根不需要,不会存在非核心的线程,因为要么在执行(核心线程),要么就是在等待队列里

采用的是一个无界的LinkedBlockingQueue作为等待队列, 默认容量为 Integer.MAX_VALUE

可以看到 return 是一个 ThreadPoolExecutor, 但在函数返回的类型是 ExecutorService,不过没事,因为 ThreadPoolExecutorExecutorService 的一个实现类

只有一个线程的线程池 - newSingleThreadExecutor()

使用newSingleThreadExecutor来创建只有一个线程的线程池:

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    // 创建一个只有一个线程的线程池
}
源码分析

原理如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

可以看到这里并不是直接创建的一个ThreadPoolExecutor对象,而是套了一层FinalizableDelegatedExecutorService

FinalizableDelegatedExecutorService
  • Finalizable: 指它重写了 Object.finalize() 方法(或使用了类似的清理机制),确保在对象被垃圾回收(GC)时能自动执行某些清理操作。

  • Delegated: 指它使用了委派模式(Delegation Pattern)。它本身不实现线程池逻辑,而是内部持有一个真正的线程池对象,并将所有操作转发给它。

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {    
        // 在GC时,会执行finalize方法,此方法中会关闭掉线程池,释放资源
        super.shutdown();
    }
}

对应的 DelegatedExecutorService 构造方法:

static class DelegatedExecutorService extends AbstractExecutorService {
    // 被委派对象
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }   //实际上所以的操作都是让委派对象执行的,有点像代理
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
}

所以,下面两种写法的区别在于:

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor1 = Executors.newSingleThreadExecutor();
    ExecutorService executor2 = Executors.newFixedThreadPool(1);
}

前者实际上是被代理了,我们没办法直接修改前者的相关属性,显然使用前者创建只有一个线程的线程池更加专业和安全(可以防止属性被修改)一些。

全非核心线程池 - newCachedThreadPool()

最后我们来看newCachedThreadPool方法:

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    //它是一个会根据需要无限制创建新线程的线程池
}

我们来看看它的实现:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

可以看到,核心线程数为0,那么也就是说所有的线程都是非核心线程,也就是说线程空闲时间超过1秒钟,一律销毁。但是它的最大容量是Integer.MAX_VALUE,也就是说,它可以无限制地增长下去,所以这玩意一定要慎用。

执行带返回值的任务 Future<>

一个多线程任务不仅仅可以是void无返回值任务,比如我们现在需要执行一个任务,但是我们需要在任务执行之后得到一个结果,这个时候怎么办呢?

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在使用自带的 Executors 工具类创建对应线程池时,可以看到其使用的是接口 ExecutorService,其执行任务的方法从 execute() 转为 submit()

并且可以看到在 ExecutorService 的源码里有大量的返回为 Future<T> 的泛型

Future

接受任务返回的结果,可以使用Future

它可以返回任务的计算结果,我们可以通过它来获取任务的结果以及任务当前是否完成

RunnableCallable

Runnable 是没有返回值,Callable是有返回值

public static void main(String[] args) throws InterruptedException, ExecutionException {
    // 直接用Executors创建
    ExecutorService executor = Executors.newSingleThreadExecutor();

    // 使用submit提交任务,会返回一个Future对象
    // 注意提交的对象可以是Runable也可以是Callable
    // 这里使用的是Callable能够自定义返回值
    Future<String> future = executor.submit(() -> "我是字符串!");
    
    // 如果任务未完成,get会被阻塞
    // 任务完成返回Callable执行结果返回值
    System.out.println(future.get());
    executor.shutdown();
}

当然结果也可以一开始就定义好,然后等待Runnable执行完之后再返回:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "我是字符串!");
    System.out.println(future.get());
    executor.shutdown();
}
FutureTask

还可以通过传入FutureTask对象的方式:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService service = Executors.newSingleThreadExecutor();
    FutureTask<String> task = new FutureTask<>(() -> "我是字符串!");
    service.submit(task);
    System.out.println(task.get());
    executor.shutdown();
}

还可以通过Future对象获取当前任务的一些状态:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(() -> "都看到这里了,不赏UP主一个一键三连吗?");
    System.out.println(future.get());
    System.out.println("任务是否执行完成:"+future.isDone());
    System.out.println("任务是否被取消:"+future.isCancelled());
    executor.shutdown();
}

我们来试试看在任务执行途中取消任务:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(() -> {
        TimeUnit.SECONDS.sleep(10);
        return "这次一定!";
    });
    System.out.println(future.cancel(true));
    System.out.println(future.isCancelled());
    executor.shutdown();
}

Future构造函数分析

FutureTask即继承了 RunnableFuture<V>, 接受 CallableRunnable

其构造方法会把Runnable的参数转为Callable

public class FutureTask<V> implements RunnableFuture<V> {
    ...
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    ...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

此外,发现其将 Runnable 转换为 Callable

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

本质是调用了 Executors.callable(runnable, result);

Executors.callable

对应实现为

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

实际上就是返回了一个 RunnableAdapter<T> 的类,他实现了 Callable 接口,因此上面就定义类型是 Callable

RunnableAdapter<T>
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

定义了 RunnableAdapter<T>call 方法就行了

Callable<T>Runnable

Callable:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Runnable:

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

执行定时任务 ScheduledThreadPoolExecutor

既然线程池怎么强大,那么线程池能不能执行定时任务呢?

我们之前如果需要执行一个定时任务,那么肯定会用到TimerTimerTask,但是它只会创建一个线程处理我们的定时任务,无法实现多线程调度,并且它无法处理异常情况一旦抛出未捕获异常那么会直接终止,显然我们需要一个更加强大的定时器。

JDK5之后,我们可以使用ScheduledThreadPoolExecutor来提交定时任务,它继承自ThreadPoolExecutor

构造函数

所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE,采用的都是DelayedWorkQueue作为等待队列

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

示例

我们来测试一下它的方法,这个方法可以提交一个延时任务,只有到达指定时间之后才会开始:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 直接设定核心线程数为1
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    // 计划在3秒后执行
    executor.schedule(() -> System.out.println("HelloWorld!"), 3, TimeUnit.SECONDS);

    executor.shutdown();
}
可以接受返回值 ScheduledFuture

我们也可以像之前一样,传入一个Callable对象,用于接收返回值:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    
    // 这里使用ScheduledFuture
    ScheduledFuture<String> future = executor.schedule(() -> "????", 3, TimeUnit.SECONDS);
    
    System.out.println("任务剩余等待时间:"+future.getDelay(TimeUnit.MILLISECONDS) / 1000.0 + "s");
    
    System.out.println("任务执行结果:"+future.get());
    executor.shutdown();
}

可以看到schedule方法返回了一个ScheduledFuture对象,和Future一样,它也支持返回值的获取、包括对任务的取消同时还支持获取剩余等待时间。

重复执行任务

那么如果我们希望按照一定的频率不断执行任务呢?

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    executor.scheduleAtFixedRate(() -> System.out.println("Hello World!"),
            3, 1, TimeUnit.SECONDS);
    // 三秒钟延迟开始,之后每隔一秒钟执行一次
}
.scheduleAtFixedRate()

其对应参数表示 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

第二个是初始延迟,第三个为间隔时间

该方法以固定的时间间隔执行任务,而不关心上一次任务执行了多久。

  • 从任务开始执行的那一刻起,就开始计算下一次执行的时间。

  • 适用于对执行频率有严格要求的任务(例如:每隔 1 小时同步一次数据)。

  • 如果任务执行的时间超过了设定的周期(period),下一次任务会在当前任务结束后立即开始,而不会并发执行。

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 * @throws IllegalArgumentException   {@inheritDoc}
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
.scheduleWithFixedDelay()

该方法在任务结束和下一次任务开始之间保持固定的延迟。

  • 只有当任务完全执行完毕后,才开始计算延迟时间(delay)。

  • 适用于需要确保两次任务之间有足够资源缓冲或休息时间的场景(例如:清理日志、扫描数据库)。

  • 实际的执行频率是 任务耗时 + 设定延迟

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
分析

导致不同的是因为 unit.toNanos(period)unit.toNanos(-delay)

然后就会进 delayedExecute() 延迟执行

ScheduledThreadPoolExecutor 使用的不是普通的 LinkedBlockingQueue,而是一个定制的 DelayedWorkQueue

基于小顶堆(Min-Heap)结构的优先级队列 | 排序依据: 任务的 time 属性(即下一次执行的纳秒时间点)

队列头部永远是那个“最快到期”的任务。线程池里的线程会去取这个头部的任务,如果时间还没到,线程就会进入等待状态。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

如果任务执行,会调用 ScheduledThreadPoolExecutor 的内部类 ScheduledFutureTaskrun 方法

/**
 * 位于 ScheduledThreadPoolExecutor.ScheduledFutureTask 类中
 */
public void run() {
    // 1. 判断是否是周期性任务 (period != 0)
    boolean periodic = isPeriodic();

    // 2. 如果当前状态下不能运行,则取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 3. 如果不是周期性任务,直接调用父类的 run (只运行一次)
    else if (!periodic)
        ScheduledFutureTask.super.run();
    
    // 4. 【重点】如果是周期性任务,先执行任务
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 执行成功后,设置下一次执行的时间
        setNextRunTime();
        // 将任务重新放回队列,等待下次调度
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    if (p > 0)
        // 【Fixed Rate】
        // 下次时间 = 上次计划的时间 + 周期
        time += p; 
    else
        // 【Fixed Delay】
        // 下次时间 = 当前时间(now) + 延迟(取绝对值)
        // triggerTime 内部会做: System.nanoTime() + (-p)
        // p 是负数 ———— -p 正数
        time = triggerTime(-p); 
}
long triggerTime(long delay) {
    // 预防性保险
    // 如果你传入的延迟超过了 Long 最大值的一半,那么 now() + delay 就有极高的风险超过 Long.MAX_VALUE,导致结果变成负数(溢出)
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

Executors对应工具类

Executors也为我们预置了newScheduledThreadPool方法用于创建线程池:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
    service.schedule(() -> System.out.println("Hello World!"), 1, TimeUnit.SECONDS);
}
上次编辑于: