CompletableFuture Java多线程操作

CompletableFuture是Java8中新增加的类,结合了Future的优点,提供了非常强大的Future的拓展功能,帮助简化了异步编程的复杂性。

其被设计在Java中进行异步编程。意味着会在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程成功或者失败。

实例化方式

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

CompletableFuture 的实例化有两种格式。一种是supply开头的方法,一种是run开头的方法

  1. supply开头:这种方法,可以返回异步线程执行之后的结果
  2. run开头:这种不会返回结果,就只是执行线程任务

或者可以通过一个简单的无参构造器

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

在实例化方法中,我们是可以指定Executor参数的,当我们不指定的试话,我们所开的并行线程使用的是默认系统及公共线程池ForkJoinPool,而且这些线程都是守护线程。我们在编程的时候需要谨慎使用守护线程,如果将我们普通的用户线程设置成守护线程,当我们的程序主线程结束,JVM中不存在其余用户线程,那么CompletableFuture的守护线程会直接退出,造成任务无法完成的问题。

其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。

获取结果

同步获取结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

例子:

CompletableFuture<Integer> future = new CompletableFuture<>();
Integer integer = future.get();

get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。

前两个方法比较通俗易懂, getNow() 则有所区别,参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值。

join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常.

get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。
而 join() 会抛出未经检查的异常。

计算完成后续操作1——complete

public CompletableFuture<T>  whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable>action)
public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable>action, Executor executor)
public CompletableFuture<T>  exceptionally(Function<Throwable,? extends T> fn)

方法1和2的区别在于是否使用异步处理,2和3的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用lambda表达式的来接收这两个参数,然后自己处理。 方法4,接收一个可抛出的异常,且必须return一个返回值,类型与钻石表达式种的类型一样,详见下文的exceptionally() 部分,更详细

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10086;
        });
        future.whenComplete((result, error) -> {
            System.out.println("拨打"+result);
            error.printStackTrace();
        });

计算完成后续操作2——handle

public <U> CompletableFuture<U>   handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>   handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>   handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是里面的参数类型,handle方法是可以自定义的。

// 开启一个异步方法
        CompletableFuture<List> future = CompletableFuture.supplyAsync(() -> {
            List<String> list = new ArrayList<>();
            list.add("语文");
            list.add("数学");
            // 获取得到今天的所有课程
            return list;
        });
        // 使用handle()方法接收list数据和error异常
        CompletableFuture<Integer> future2 = future.handle((list,error)-> {
            // 如果报错,就打印出异常
            error.printStackTrace();
            // 如果不报错,返回一个包含Integer的全新的CompletableFuture
            return list.size();
             // 注意这里的两个CompletableFuture包含的返回类型不同
        });

计算完成的后续操作3——apply

public <U> CompletableFuture<U>   thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>   thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>   thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

apply方法和handle方法一样,都是结束计算之后的后续操作,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理。 如果不想每个链式调用都处理异常,那么就使用apply吧。

例子:请看下面的exceptionally() 示例

计算完成的后续操作4——accept

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

accept() 三个方法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,无返回,有点像流式编程的终端操作
例子:请看下面的exceptionally() 示例

捕获中间产生的异常——exceptionally

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally() 可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级 有点像,默认值的类型和上一个操作的返回值相同。 小贴士 :向线程池提交任务的时候发生的异常属于外部异常,是无法捕捉到的,毕竟还没有开始执行任务。exceptionally() 无法捕捉RejectedExecutionException()

// 实例化一个CompletableFuture,返回值是Integer
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 返回null
            return null;
        });

        CompletableFuture<String> exceptionally = future.thenApply(result -> {
            // 制造一个空指针异常NPE
            int i = result;
            return i;
        }).thenApply(result -> {
            // 这里不会执行,因为上面出现了异常
            String words = "现在是" + result + "点钟";
            return words;
        }).exceptionally(error -> {
            // 我们选择在这里打印出异常
            error.printStackTrace();
            // 并且当异常发生的时候,我们返回一个默认的文字
            return "出错啊~";
        });

        exceptionally.thenAccept(System.out::println);

    }

组合式异步编程

组合两个completableFuture

Future做不到的事:

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。

thenApply()

thenApply()可以将上一步的结果进行下一步操作

假设一个场景,我是一个小学生,我想知道今天我需要上几门课程 此时我需要两个步骤,1.根据我的名字获取我的学生信息 2.根据我的学生信息查询课程 我们可以用下面这种方式来链式调用api,使用上一步的结果进行下一步操作

CompletableFuture<List<Lesson>> future = CompletableFuture.supplyAsync(() -> {
            // 根据学生姓名获取学生信息
            return StudentService.getStudent(name);
        }).thenApply(student -> {
            // 再根据学生信息获取今天的课程
            return LessonsService.getLessons(student);
        });

我们根据学生姓名获取学生信息,然后使用把得到的学生信息student传递到apply() 方法再获取得到学生今天的课程列表。

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖

thenCompose()

thenCompose() 可以进行两个异步操作的值传递:

假设一个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校

CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
            // 第一个任务获取美术课需要带的东西,返回一个list
            List<String> stuff = new ArrayList<>();
            stuff.add("画笔");
            stuff.add("颜料");
            return stuff;
        }).thenCompose(list -> {
            // 向第二个任务传递参数list(上一个任务美术课所需的东西list)
            CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
                List<String> stuff = new ArrayList<>();
                // 第二个任务获取劳技课所需的工具
                stuff.add("剪刀");
                stuff.add("折纸");
                // 合并两个list,获取课程所需所有工具
                List<String> allStuff = Stream.of(list, stuff)
                                .flatMap(Collection::stream).collect(Collectors.toList());
                return allStuff;
            });
            return insideFuture;
        });
        System.out.println(total.join().size());

我们通过CompletableFuture.supplyAsync() 方法创建第一个任务,获得美术课所需的物品list,然后使用thenCompose() 接口传递list到第二个任务,然后第二个任务获取劳技课所需的物品,整合之后再返回。至此我们完成两个任务的合并。 (说实话,用compose去实现这个业务场景看起来有点别扭,我们看下一个例子)

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖

thenCombine()

thenCombine()可以将两个CompletableFuture异步计算进行组合

还是上面那个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校

CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
            // 第一个任务获取美术课需要带的东西,返回一个list
            List<String> stuff = new ArrayList<>();
            stuff.add("画笔");
            stuff.add("颜料");
            return stuff;
        });
        CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
            // 第二个任务获取劳技课需要带的东西,返回一个list
            List<String> stuff = new ArrayList<>();
            stuff.add("剪刀");
            stuff.add("折纸");
            return stuff;
        });
        CompletableFuture<List<String>> total = painting
                // 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
                .thenCombine(handWork, (stuff1, stuff2) -> {
                    // 合并成新的list
                    List<String> totalStuff = Stream.of(stuff1, stuff1)
                            .flatMap(Collection::stream)
                            .collect(Collectors.toList());
                    return totalStuff;
                });
        System.out.println(JSONObject.toJSONString(total.join()));
  • 等待Future集合中的所有任务都完成。

获取所有完成结果——allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

allOf方法,当所有给定的任务完成后,返回一个全新的已完成CompletableFuture

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                //使用sleep()模拟耗时操作
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            return 2;
        });
        CompletableFuture.allOf(future1, future1);
        // 输出3
        System.out.println(future1.join()+future2.join());

获取率先完成的任务结果——anyOf

  • 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。 小贴士 :如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个exceptionally() 去处理一下可能发生的异常并设定默认返回值
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            throw new NullPointerException();
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                // 睡眠3s模拟延时
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        CompletableFuture<Object> anyOf = CompletableFuture
                .anyOf(future, future2)
                .exceptionally(error -> {
                    error.printStackTrace();
                    return 2;
                });
        System.out.println(anyOf.join());

例子

多个方法组合使用

  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  • 应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)
public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 1)
                .whenComplete((result, error) -> {
                    System.out.println(result);
                    error.printStackTrace();
                })
                .handle((result, error) -> {
                    error.printStackTrace();
                    return error;
                })
                .thenApply(Object::toString)
                .thenApply(Integer::valueOf)
                .thenAccept((param) -> System.out.println("done"));
    }

循环创建并发任务

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        // 自定义一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 循环创建10个CompletableFuture
        List<CompletableFuture<Integer>> collect = IntStream.range(1, 10).mapToObj(i -> {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                // 在i=5的时候抛出一个NPE
                if (i == 5) {
                    throw new NullPointerException();
                }
                try {
                    // 每个依次睡眠1-9s,模拟线程耗时
                    TimeUnit.SECONDS.sleep(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
                return i;
            }, executorService)
                    // 这里处理一下i=5时出现的NPE
                    // 如果这里不处理异常,那么异常会在所有任务完成后抛出,小伙伴可自行测试
                    .exceptionally(Error -> {
                        try {
                            TimeUnit.SECONDS.sleep(5);
                            System.out.println(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return 100;
                    });
            return future;
        }).collect(Collectors.toList());
        // List列表转成CompletableFuture的Array数组,使其可以作为allOf()的参数
        // 使用join()方法使得主线程阻塞,并等待所有并行线程完成
        CompletableFuture.allOf(collect.toArray(new CompletableFuture[]{})).join();
        System.out.println("最终耗时" + (System.currentTimeMillis() - begin) + "毫秒");
        executorService.shutdown();
    }

使用CompletableFuture场景

  • 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度
  • 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常
  • 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个