并发容器和线程池

当你的线程需要执行一个后继任务,即完成每个前置任务后,会自动执行下一个任务。这时我们使用CompletableFuture 来实现。

CompletableFuture 是一个异步任务编排、调度框架,以更优雅的方式实现组合式异步编程。

ps:如果程序调用某个方法,等待其执行全部处理后才能继续执行,我们称其为同步的。相反,在处理完成之前就返回调用方法则是异步的。
我们在编程语言的流程中添加了异步控制的部分,这部分的编程可以称之为异步编程

使用如例:

public class StudentIDTest {
  public static void main(String[] args) {
    // 构建学生集合
    List<Student> studentList = new ArrayList<>();
    for (int i = 1; i <= 10; i++) {
      Student s = new Student();
      s.setName("学生" + i);
      studentList.add(s);
    }

    Register reg = new Register();

    studentList.forEach(s -> {
      CompletableFuture.supplyAsync(
          // 每个学生都注册学号
          () -> reg.regId(s)
        )
        // 学号注册完毕后,打印欢迎消息
        .thenAccept(student -> {
          System.out.println("你好 " + student.getName() + ", 欢迎来到春蕾中学大家庭");
        });
    });

    System.out.println("mission complate");
  }
}

CompletableFuture.supplyAsync() 方法运行一个异步任务并且返回结果,所以 regId()

Register 没有实现 Runnable 接口,但系统会自动优化:把作为 supplyAsync() 方法参数的整个 () -> reg.regId(s) 表达式语句包装在另一个对象中;这个对象也是 JDK 内置的,它实现了 Runnable

实际上 supplyAsync() 方法的作用是:在一个单独的线程中执行 reg.regId(s)语句,本质上是多线程编程

之后使用 thenAccept() 方法完成后继的任务步骤。thenAccept() 方法的参数(student)就是前置任务的返回结果,系统会在前一个任务完成后,自动执行 student -> {} 后继任务。所以本质上,后继任务也是多线程方式执行的。thenAccept() 方法通常用于任务链的末尾。

CompletableFuture 与 数据流 Stream parallelStream() 十分相似,两者都是多线程并发编程,都可以被称为并发容器,不同的是前者任务比较宽泛,而后者侧重流的元素的计算操作。

supplyAsync() 用于开头thenAccept() 用于末尾,各自调用一次即可。中间有多个步骤,可以调用多次 thenApply() 。由于末尾也要用到 


拓展

supplyAsync() 是静态方法,返回值是 CompletableFuture 实例对象,再调用 thenApply() 或 thenAccept() 实例方法,返回的也是 CompletableFuture实例对象

所以,虽然整条语句是连写的,其实也可以定义返回值。

CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> reg.regId(s))
  .thenApply(student -> {
    return dis.assignClasses(student);
  })
  .thenAccept(student -> {
     System.out.println("姓名:" + student.getName() + ",学号:" + student.getId() + ",班级号:" + student.getClassId());
  });

返回的是仍然是 CompletableFuture 实例对象,所以定义变量的类型就是 CompletableFuture 。但可以用泛型 CompletableFuture<> 表示其中包含的数据具体是什么类型。

因为本案例末尾调用了 thenAccept(),其 Lambda 表达式没有 return 语句,表示 CompletableFuture 实例对象不包含数据,所以泛型写为 CompletableFuture<Void>

Void 的完整写法是 java.lang.Void,是 void 关键字的包装类。表示没有类型,等同于 null

返回 CompletableFuture 类型

如果没有调用 thenAccept() 方法,以 thenApply() 或 supplyAsync() 结尾的话,例如代码:

CompletableFuture.supplyAsync(() -> reg.regId(s))
  .thenApply(student -> {
    return dis.assignClasses(student);
  });

因为 thenApply() 的 Lambda 表达式返回的是 Student 对象,所以 CompletableFuture 实例对象包含的是 Student 数据,于是泛型写为 CompletableFuture<Student>

CompletableFuture<Student> cf = CompletableFuture.supplyAsync(() -> reg.regId(s))
  .thenApply(student -> {
    return dis.assignClasses(student);
  });

这几个方法返回的是 CompletableFuture 实例,但其中包含什么类型的数据取决于 Lambda 表达式返回值的类型,如果没有返回值,则用 <Void> 表示。

3、扩展知识点:main() 方法的问题

目前我们的程序,都是通过 main() 方法执行的。如果学生人数较多,例如 2000 个,所有注册线程的运行就没有那么快完毕了。

问题是,可能线程任务还没执行完毕,main() 方法就执行完毕,导致程序运行结束退出了。

看到这里,大家可以在自己电脑上运行程序,执行 2000 个甚至一万个学生注册。观察出现的现象。

要解决这个问题,返回值就有用了。我们先把每个学生的入学任务实例对象(CompletableFuture<Void>),收集起来(装入集合),然后等待所有的线程执行完毕。

List<CompletableFuture> cfs = new ArrayList<>();
studentList.forEach(s -> {
  CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> reg.regId(s))
    .thenApply(student -> {
        return dis.assignClasses(student);
    }).thenAccept(student -> {
        System.out.println("姓名:" + student.getName() + ",学号:" + student.getId() + ",班级号:" + student.getClassId());
    });

  cfs.add(cf);
});

try {
  // 等待所有的线程执行完毕
  CompletableFuture.allOf(cfs.toArray(new CompletableFuture[] {})).get();
} catch (Exception e) {
  e.printStackTrace();
}

CompletableFuture.allOf() 是静态方法,的作用就是收集所有的任务实例对象。因为 allOf() 方法只支持数组不支持集合,所以需要把集合转换成数组(cfs.toArray(new CompletableFuture[] {}))。当然,你可以一开始就定义数组来收集任务实例对象,因为学生的个数可以通过 studentList.size() 取得。allOf() 方法的返回值也是 CompletableFuture 实例对象。

集合的 toArray() 方法,在 《Java面向对象》第 7 章第 1 节有讲到,不熟悉的同学可以复习一下。

再调用类方法 get(),其作用就是等待所有的任务线程(allOf() 收集的)都执行完毕,再继续执行(本案例 main() 方法后面没代码了,就退出程序)。

运行一下:

需要强调的是

在 SpringBoot 等服务端运行 supplyAsync() 异步任务编排的时候,就没有必要可以使用 get() 方法等待所有线程任务执行完毕了。因为服务端往往是常驻程序,不像 main() 方法执行完毕就退出程序了。


安全的布尔值包装类

Java除了提供能够以原子的方式操作整数的 AtomicInteger 也为布尔值提供了原子操作方式 AtomicBoolean

AtomicBoolean 是 boolean 的包装类,AtomicBoolean 的实例等同于一个布尔值:

  • new AtomicBoolean(true) 等同于 true
  • new AtomicBoolean(false) 等同于 false

取得布尔值

实例对象取得基础类型的布尔值,可以调用 get() 方法:

AtomicBoolean ab = new AtomicBoolean(true);
boolean value = ab.get();

实例对象调用 compareAndSet() 方法,就能以原子的方式修改值:

true 改为 false

compareAndSet(true, false) 判断当前值为 true 时,修改为 false,然后返回成功或失败。这是三个步骤哦。

  • 修改成功后,方法返回 true 。
  • 如果当前值不是 true ,则不修改,返回值为 false,表示操作失败

compareAndSet() 实际上就是保证了整个修改操作的三个步骤的原子性,不会因为多线程出现错乱。

false 改为 true

compareAndSet(false, true) 判断当前值为 false 时,修改为 true,然后返回成功或失败

  • 修改成功后,方法返回 true 。
  • 如果当前值不是 false ,则不修改,返回值为 false,表示操作失败

再次强调compareAndSet() 方法返回值表示修改操作成功或失败,跟方法参数值无关。


线程池

使用 Runnable 接口开发多线程程序,更符合面向对象的习惯,但也会导致对象太多的问题,比如学生系统,如果每个学生都分配一个线程线程就意味着需要分配数千甚至近万的线程Thread对象,消耗计算机资源,这就需要我们复用 Thread对象即使用线程池

线程池就像一个池子,装满了线程,随用随取,线程可以被复用,一个线程可以执行A任务,也可以执行B任务,于是线程不再频繁创建和销毁。

new Thread(register) 意味着一个线程对象只能执行一个任务,而线程池让线程与任务分离,不再紧密绑定

同时线程池不是无限大的,里面存在的线程数也是有限的,这意味着能同时运行的任务数是有限的,其他过剩的任务就需要排队,等待有空闲的线程再执行。

核心代码:

public class StudentIDTest {

  // 线程工厂
  private static final ThreadFactory namedThreadFactory = new BasicThreadFactory.Builder()
    .namingPattern("studentReg-pool-%d")
    .daemon(true)
    .build();

  // 等待队列
  private static final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(1024);

  // 线程池服务
  private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
        20,
        200,
        30,
        TimeUnit.SECONDS,
        workQueue,
        namedThreadFactory,
        new ThreadPoolExecutor.AbortPolicy()
      );

  public static void main(String[] args) {

  }
}

这里的 BasicThreadFactory 需要一个依赖库

<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.10</version>
</dependency>

1.创建线程工厂实例:

new BasicThreadFactory.Builder()
  .namingPattern("studentReg-pool-%d")
  .daemon(true)
  .build();

这里的namingPattern()定义了线程名称的格式,即线程名称模板

2.创建线程等待队列实例

线程池没有空闲的线程时,其它的任务,就需要在队列中等待。

可以类比一下:春运期间坐火车的人太多,火车站候车大厅容量有限,很多乘客就在候车大厅外排队等候。

如果机器性能好,CPU 核数多(6、8核)、内存大,队列可以大一些:new LinkedBlockingQueue<Runnable>(2048)。构造函数的参数表示能排队的任务个数。

如果机器性能好,CPU 核数少(1、2核)、内存大,队列就小一些:new LinkedBlockingQueue<Runnable>(512)

一般来说,new LinkedBlockingQueue<Runnable>(1024)也还可以

3.创建线程池实例

ThreadPoolExecutor 构造函数参数较多,七个参数按顺序说明如下:

参数序号解释
1线程池初始化核心线程数量,一般是两位数,通常不大
2线程池最大线程数,计算机性能强就大一些,否则小一些,通常不超过 200
3线程池中的线程数超过核心线程数时,如果一段时间后还没有任务指派,就回收了。想立即回收就填 0,一般 30
4第三个参数的时间单位。30 + TimeUnit.SECONDS 表示 30 秒
5等待队列实例,已经创建过了
6线程工厂实例,已经创建过了
7任务太多,超过队列的容量时,用什么样的策略处理。一般用 AbortPolicy 表示拒绝,让主程序自己处理

多线程编程特别需要注意的问题的是:防止线程数过多把系统搞崩溃。所以用线程池可以做更加精确的控制,否则难以控制、无法保证稳定。

实际编程工作中,要想办法保证不要创建太多的任务,要有所控制,而不是只管创建任务扔进线程池。比如可以采用分页的思想,分批处理。一批只处理几十个、一二百个任务。还要考虑任务执行时间,能不能快速结束。不要让一台计算机堆积太多任务,保证线程等待队列能容纳。

这些容量大小的各个参数值,在学习阶段,都不用纠结。例子中的够用了。

实际工作中,根据公司的服务器的状况,选择合适的数值,到时候跟公司经验丰富的同事请教、沟通即可。