FutureCompletableFuture实战
1. Callable&Future&FutureTask介绍
直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景,而Future和FutureTask就可以和Callable接口配合起来使用。
@FunctionalInterface
public interface Runnable {public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
Runnable 的缺陷:
- 不能返回一个返回值
- 不能抛出 checked Exception
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。
new Thread(new Runnable() {@Overridepublic void run() {System.out.println("通过Runnable方式执行任务");}
}).start();FutureTask task = new FutureTask(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("通过Callable方式执行任务");Thread.sleep(3000);return "返回任务结果";}
});
new Thread(task).start();
System.out.println(task.get());
1.1 Future 的API
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
- boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
- boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
- boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
- V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
- V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
1.2 FutureTask 使用
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。
public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Task task = new Task();//构建futureTaskFutureTask<Integer> futureTask = new FutureTask<>(task);//作为Runnable入参new Thread(futureTask).start();System.out.println("task运行结果:"+futureTask.get());}static class Task implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("子线程正在计算");int sum = 0;for (int i = 0; i < 100; i++) {sum += i;}return sum;}}
}
使用案例:促销活动中商品信息查询
在维护促销活动时需要查询商品信息(包括商品基本信息、商品价格、商品库存、商品图片、商品销售状态等)。这些信息分布在不同的业务中心,由不同的系统提供服务。如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要200ms-300ms,这对于我们来说是不满意的。如果使用Future改造则需要的就是最长耗时服务的接口,也就是50ms左右。
public class FutureTaskDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> ft1 = new FutureTask<>(new T1Task());FutureTask<String> ft2 = new FutureTask<>(new T2Task());FutureTask<String> ft3 = new FutureTask<>(new T3Task());FutureTask<String> ft4 = new FutureTask<>(new T4Task());FutureTask<String> ft5 = new FutureTask<>(new T5Task());//构建线程池ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.submit(ft1);executorService.submit(ft2);executorService.submit(ft3);executorService.submit(ft4);executorService.submit(ft5);//获取执行结果System.out.println(ft1.get());System.out.println(ft2.get());System.out.println(ft3.get());System.out.println(ft4.get());System.out.println(ft5.get());executorService.shutdown();}static class T1Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T1:查询商品基本信息...");TimeUnit.MILLISECONDS.sleep(50);return "商品基本信息查询成功";}}static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:查询商品价格...");TimeUnit.MILLISECONDS.sleep(50);return "商品价格查询成功";}}static class T3Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T3:查询商品库存...");TimeUnit.MILLISECONDS.sleep(50);return "商品库存查询成功";}}static class T4Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T4:查询商品图片...");TimeUnit.MILLISECONDS.sleep(50);return "商品图片查询成功";}}static class T5Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T5:查询商品销售状态...");TimeUnit.MILLISECONDS.sleep(50);return "商品销售状态查询成功";}}}
1.3 Future的局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
2. CompletableFuture使用详解
简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Future 实现,是非常麻烦的。
CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
2.1 应用场景
描述依赖关系:
- thenApply() 把前面异步任务的结果,交给后面的Function
- thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
描述and聚合关系:
- thenCombine:任务合并,有返回值
- thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
- runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
描述or聚合关系:
- applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
- acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
- runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
并行执行:
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
2.2 创建异步操作
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这四个方法区别在于:
- runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞)
- 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
- 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
runAsync&supplyAsync
Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});String result = future.get();System.out.println(result);
执行无返回结果的异步任务
执行有返回值的异步任务
2.3 获取结果
join&get
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
2.4 结果处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
- Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
- 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
- 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
whenComplete&exceptionally
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("执行结束!");return "test";});future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 执行完成!");}});future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("执行失败:" + t.getMessage());return "异常xxxx";}}).join();
执行结束!
test 执行完成!
或者
执行失败:java.lang.ArithmeticException: / by zero
null 执行完成!
2.5 结果转换
所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。
thenApply
thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一阶段:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("二阶段:" + result);return result;});System.out.println("最终结果:" + future.get());
一阶段:100
二阶段:300
最终结果:300
thenCompose
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一阶段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二阶段:" + number);return number;}});}});System.out.println("最终结果: " + future.get());
第一阶段:10
第二阶段:20
最终结果:20
thenApply 和 thenCompose的区别
- thenApply转换的是泛型中的类型,并返回一个新的封装了转换结果的
CompletableFuture实例;
- thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());System.out.println(result2.get());
Hello World
Hello World
2.6 结果消费
与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。
根据对结果的处理方式,结果消费函数又分为:
- thenAccept系列:对单个结果进行消费
- thenAcceptBoth系列:对两个结果进行消费
- thenRun系列:不关心结果,只对结果执行Action
thenAccept
通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenAccept(number ->System.out.println("第二阶段:" + number * 5));System.out.println("最终结果:" + future.get());
第一阶段:8
第二阶段:40
最终结果:null
thenAcceptBoth
thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));}}).join();
第二阶段:1
第一阶段:2
最终结果:3
thenRun
thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenRun(() ->System.out.println("thenRun 执行"));System.out.println("最终结果:" + future.get());
第一阶段:2
thenRun 执行
最终结果:null
2.7 结果组合
thenCombine
thenCombine 方法,合并两个线程任务的结果,并进一步处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段:" + number);return number;}});CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});System.out.println("最终结果:" + result.get());
第一阶段:9
第二阶段:5
最终结果:14
2.8 任务交互
所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。
applyToEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段end:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段end:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}}).join();
第一阶段start:6
第二阶段start:5
第二阶段end:5
最快结果:5
acceptEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);}}).join();
第二阶段:3
最快结果:3
runAfterEither
两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}}).join();"); } }).join();
第一阶段:3
已经有一个任务完成了
runAfterBoth
两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:1");return 1;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:2");return 2;}});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面两个任务都执行完成了。");}}).get();
第一阶段:1
第二阶段:2
上面两个任务都执行完成了。
anyOf
anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);System.out.println(result.get());
world
allOf
allOf方法用来实现多 CompletableFuture 的同时返回。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);try {combindFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("future1: " + future1.isDone() + ",future2: " + future2.isDone());
future2完成!
future1完成!
future1: true,future2: true
2.9 使用案例:实现最优的“烧水泡茶”程序
著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。
基于Future实现
public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());}}// T1Task需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}}
基于CompletableFuture实现
public class CompletableFutureDemo2 {public static void main(String[] args) {//任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}}
相关文章:
FutureCompletableFuture实战
1. Callable&Future&FutureTask介绍 直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景,而Fu…...
Amazon与Shopee平台对比:跨境卖家如何选对平台打开市场?
在跨境电商领域,选择合适的平台对卖家能否成功打开市场至关重要。如今,Amazon和Shopee成为了众多卖家的热门选择。一个以全球化布局和高端市场著称,一个则专注东南亚新兴市场的潜力。两者各有优势,但也需要根据卖家的业务模式、目…...
【项目实战】redis实现websocket分布式消息推送服务
由于redis并非专业的MQ中间件,消息的防丢失策略并不完整,存在丢失消息的可能。该方案为在再pc web管理平台的右下角弹出,显示新接收到的消息数,哪怕没有收到这个通知,也可以自己在消息中心看看。所以对可靠性要求不高。…...
(自用)配置文件优先级、SpringBoot原理、Maven私服
配置优先级 之前介绍过SpringBoot中支持三类配置文件.properties、.yml和.yaml,他们三者之间也是有着优先级顺序的,为.properties➡.yml➡.yaml。 同时SpringBoot为了增强程序的拓展性,除了支持配置文件属性配置,还支持Java系统属…...
在windows系统中使用labelimg对图片进行标注之工具安装及简单使用
一.背景 还是之前的主题,使用开源软件为公司搭建安全管理平台,从视觉模型识别安全帽开始。我是从运行、训练、标注倒过来学习的。本次主要是学习标注工具labelimg的安装及简单使用。 二.下载 LabelImg是一款广受欢迎的开源图像标注工具,为计…...
数字图像处理技术期末复习
1. 已知图像的分辨率和深度,怎么求图像的存储空间(位,字节,KB)? 题目: 已知图像的分辨率和深度,怎么求图像的存储空间(位,字节,KB)&a…...
点云空洞的边界识别提取 pso-bp 神经网络的模型来修复点云空洞 附python代码
代码是一个Python程序,用于处理3D点云数据,特别是检测和修复点云中的孔洞区域。 1. **导入库**: - `numpy`:用于数学运算。 - `open3d`:用于处理3D数据和可视化。 - `torch`:PyTorch库,用于深度学习。 - `torch.nn`和`torch.optim`:PyTorch的神经网络和优…...
【AutoDL】通过【SSH远程连接】【vscode】
小帅碎碎念 0. 起因1. SSH信息获取2. 给你的vscode安装支持SSH远程连接的插件3. SSH远程连接入口4. 输入密码登陆5. 总结 0. 起因 之前使用AutoDL和Jupyter进行代码编辑和执行确实很方便,尤其是对于交互式数据分析项目。然而,也存在一些限制和不便之处&…...
ubuntu22.04编译安装Opencv4.8.0+Opencv-contrib4.8.0教程
本章教程,主要记录在Ubuntu22.04版本系统上编译安装安装Opencv4.8.0+Opencv-contrib4.8.0的具体过程。 一、下载opencv和opencv-contrib包 wget https://github.com/opencv/opencv/archive/refs/tags/4.8.0.zip wget https://github.com/opencv/opencv_contrib/archive/refs/…...
短链接服务
一 功能描述 1.短链接是将长连接转化为短连接使得链接变得美观清爽,让用户点击率更高,同时规避原始链接中一些关键词、域名屏蔽等问题,最终利用短链每次跳转都需要经过后端的特性,在跳转过程中做异步埋点,用于效果数据…...
【Vue3学习】setup语法糖中的ref,reactive,toRef,toRefs
在 Vue 3 的组合式 API(Composition API)中,ref、reactive、toRef 和 toRefs 是四个非常重要的工具函数,用于创建和管理响应式数据。 一、ref 用ref()包裹数据,返回的响应式引用对象,包含一个 .value 属性࿰…...
Halcon中dots_image(Operator)算子原理及应用详解
在HALCON中,dots_image算子是一个用于增强图像中圆点效果的强大工具,特别适合于点的分割,以及OCR(光学字符识别)应用程序中增强点状印刷字体。以下是对dots_image (ImageResult, DotImage, 5, ‘dark’, 2)算子原理及应…...
【C语言】库函数常见的陷阱与缺陷(四):内存内容操作函数[5]--memchr
C语言中的memchr函数用于在内存块中搜索一个特定的字符(实际上是unsigned char类型的值),并返回该字符第一次出现的指针。虽然这个函数在内存搜索中非常有用,但它也存在一些陷阱。 一、功能与用法 功能:memchr函数在指定的内存块中搜索第一次出现的特定字符,并返回一个…...
【P2P】【Go】采用go语言实现udp hole punching 打洞 传输速度测试 ping测试
服务器端 udpserver/main.go package mainimport ("fmt""net""sync""sync/atomic" )var (clientCounter uint64 0 // 客户端连接计数器mu sync.Mutex )func main() {addr, err : net.ResolveUDPAddr("udp", &q…...
【附源码】Electron Windows桌面壁纸开发中的 CommonJS 和 ES Module 引入问题以及 Webpack 如何处理这种兼容
背景 在尝试让 ChatGPT 自动开发一个桌面壁纸更改的功能时,发现引入了一个 wallpaper 库,这个库的入口文件是 index.js,但是 package.json 文件下的 type:"module",这样造成了无论你使用 import from 还是 require&…...
【SpringBoot 调度任务】
在 Spring Boot 中实现调度任务(Scheduled Tasks),通过使用 EnableScheduling 和 Scheduled 注解来完成。 添加依赖启用调度任务支持创建调度任务运行应用程序 添加依赖 pom.xml 文件中有以下依赖项: <dependency><gro…...
Android v4和v7冲突
android.useAndroidXtrue android.enableJetifiertruev4转成AndroidX...
【HarmonyOS之旅】HarmonyOS开发基础知识(一)
目录 1 -> 应用基础知识 1.1 -> 用户应用程序 1.2 -> 用户应用程序包结构 1.3 -> Ability 1.4 -> 库文件 1.5 -> 资源文件 1.6 -> 配置文件 1.7 -> pack.info 1.8 -> HAR 2 -> 配置文件简介 2.1 -> 配置文件的组成 3 -> 配置文…...
【排序算法】——插入排序
目录 前言 简介 基本思想 1.直接插入排序 2.希尔排序 代码实现 1.直接插入排序 2.希尔排序 总结 1.时空复杂度 2.稳定性 尾声 前言 排序(Sorting) 是计算机程序设计中的一种重要操作,它的功能是将一个数据元素(或记录)的任意序列&…...
Vue todoList小项目记录
最初代码 简单搭一个vue2的小项目 App.vue <template><div id"app"><!-- 容器 --><div class"todo-container"><div class"todo-wrap"><!-- 头部 --><MyHeader :addTodo"addTodo"></…...
SQL题目笔记
一、根据需求创建表(设计合理的数据类型、长度)...
电脑开机提示error loading operating system怎么修复?
前一天电脑还能正常运行,但今天启动时却显示“Error loading operating system”(加载操作系统错误)。我已经仔细检查了硬盘、接线、内存、CPU和电源,确认这些硬件都没有问题。硬盘在其他电脑上可以正常使用,说明不是硬…...
Nginx 在不同操作系统下的安装指南
Nginx 在不同操作系统下的安装指南 一、Linux 系统下 Nginx 的安装 (一)基于 Ubuntu 系统 更新软件包列表 打开终端,首先执行sudo apt-get update命令。这一步是为了确保系统的软件包列表是最新的,能够获取到最新版本的 Nginx 及…...
景联文科技入选中国信通院发布的“人工智能数据标注产业图谱”
近日,由中国信息通信研究院、中国人工智能产业发展联盟牵头,联合中国电信集团、沈阳市数据局、保定高新区等70多家单位编制完成并发布《人工智能数据标注产业图谱》。景联文科技作为人工智能产业关键环节的代表企业,入选图谱中技术服务板块。…...
Nginx - 负载均衡及其配置(Balance)
一、概述 定义:在多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源中分配负载目标:最佳化资源使用、最大化吞吐率、最小化响应时间、避免过载功能:使用多台服务器提供单一服务(服务器农场&am…...
MySQL存储引擎-存储结构
Innodb存储结构 Buffer Pool(缓冲池):BP以Page页为单位,页默认大小16K,BP的底层采用链表数据结构管理Page。在InnoDB访问表记录和索引时会在Page页中缓存,以后使用可以减少磁盘IO操作,提升效率。 ○ Page根据状态可以分…...
数据资产入表 解锁智慧城市新潜力
在21世纪的科技浪潮中,智慧城市以信息技术为核心,以数据为血液,通过智能化、精细化的管理,让城市变得更加智慧、更加宜居。而数据资产入表,正是这一变革中的关键一环,它不仅推动了科技的进步,更…...
按类别调整目标检测标注框的写入顺序以优化人工审核效率
引言 在目标检测数据标注审核过程中,我们常常会遇到以下情况:某些小目标的检测框嵌套在大目标检测框内,而在模型进行预标注后,这些小目标的框可能被写入到了大目标框的下层。在人工审核阶段,标注审核人员需要手动移动…...
深入理解YOLO系列目标检测头的设定方式
目录 YOLOv1的检测头结构 1. 网络结构概述 2. 结构细节 3. 优缺点 YOLOv2的检测头结构 1. 网络结构概述 2. 结构细节 3. 优缺点 YOLOv3的检测头结构 1. 网络结构概述 2. 结构细节 3. 优缺点 总结:YOLO 系列检测头的结构演变 YOLOv1的检测头结构 1. 网络…...
智慧农业物联网解决方案:道品科技水肥一体化
在当今科技飞速发展的时代,农业也迎来了一场深刻的变革。智慧农业物联网解决方案中的水肥一体化技术,正逐渐成为现代农业发展的重要助推器。它不仅提高了农业生产效率,还实现了精准施肥和灌溉,为农业可持续发展带来了新的机遇。 …...
单片机上电后程序不运行怎么排查问题?
1.电源检查。使用电压表测量单片机的电源电压是否正常,确保电压在规定的范围内,如常见的5V。 2.复位检查。检查复位引脚的电压是否正常,在单片机接通电源时,复位引脚通常会有一个高电平,按下复位按钮时,复位…...
OceanBase 数据库分布式与集中式 能力
OceanBase分布式数据库与集中式数据库的差异 分布式数据库能解决金融行业最有挑战的高并发低延迟的核心交易系统的稳定性、扩展性、高性能问题。OB之所以一直强调分布式是说它具备很强的数据处理能力,当然从OB4.0开始也支持集中式了。 在实际业务场景中20%是分布式…...
C#多线程
C#中的多线程编程是开发高效并发应用程序的关键技术之一,它允许程序同时执行多个任务,从而提升应用程序的响应速度和性能。为了更好地理解C#中的多线程使用和定义,我们可以从以下几个方面来探讨:线程的基本概念、创建线程的方法、…...
Apache HTTP 服务器深度性能优化
引言 在前几篇文章中,我们讨论了基础和高级性能优化策略。现在,我们将深入探讨一些具体的优化实践,帮助您实现更精细的控制,并确保Apache服务器在各种复杂环境中都能保持最佳性能。 1. 细粒度的Apache配置调整 1.1 MPM参数微调…...
芯片级IO (Pad) Ring IP Checklist
SoC top顶层数字后端实现都会涉及到IO Ring (PAD Ring)的设计。这里面包括VDD IO,VDDIO IO, Signal IO, Corner IO,Filler IO,IO power cut cell等等。 数字后端零基础入门系列 | Innovus零基础LAB学习Day2 数字IC后端实现TOP F…...
无界wujie网址
文档网址:微前端是什么 | 无界 demo:https://wujie-micro.github.io/demo-main-vue/react17...
vulnhub靶场【DriftingBlues】之6
前言 靶机:DriftingBlues-6,IP地址192.168.1.63,因为重装靶机后期为192.168.1.64 攻击:kali,IP地址192.168.1.16 都采用虚拟机,网卡为桥接模式 主机发现 使用arp-scan -l或netdiscover -r 192.168.1.1…...
心情追忆- Nginx + OpenResty 构建高可用网关
之前,我独自一人开发了一个名为“心情追忆”的小程序,旨在帮助用户记录日常的心情变化及重要时刻。我从项目的构思、设计、前端(小程序)开发、后端搭建到最终部署。经过一个月的努力,通过群聊分享等方式,用…...
太速科技-527-基于3U VPX XCZU15EG+TMS320C6678的信号处理板
基于3U VPX XCZU15EGTMS320C6678的信号处理板 一、板卡概述 本板卡系我司自主研发的基于3U VPX风冷、导冷架构的信号处理板,适用于高速图像处理等。芯片采用工业级设计。 板卡采用标准3U VPX架构,板上集成一片Xilinx公司ZynqUltraScale系列F…...
Vue3源码笔记阅读1——Ref响应式原理
本专栏主要用于记录自己的阅读源码的过程,希望能够加深自己学习印象,也欢迎读者可以帮忙完善。接下来每一篇都会从定义、运用两个层面来进行解析 定义 运用 例子:模板中访问ref(1) <template><div>{{str}}</div> </template> <script> impo…...
多音轨视频使用FFmpeg删除不要音轨方法
近期给孩子找宫崎骏动画,但是有很多是多音轨视频但是默认的都是日语,电视上看没办法所以只能下载后删除音轨文件只保留中文。 方法分两步,先安装FFmpeg在转文件即可。 第一步FFmpeg安装 FFmpeg是一个开源项目,包含了处理视频的…...
AtomGit 开源生态应用开发赛报名开始啦
目录 1、赛项背景2、赛项信息3、报名链接4、赛题一:开发者原创声明(DCO)应用开发赛题要求目标核心功能 5、赛题二:基于 OpenHarmony 的开源社区应用开发简介赛题要求 6、参赛作品提交初赛阶段决赛阶段 7、参赛作品提交方式 1、赛项…...
使用 NVIDIA DALI 计算视频的光流
引言 光流(Optical Flow)是计算机视觉中的一种技术,主要用于估计视频中连续帧之间的运动信息。它通过分析像素在时间维度上的移动来预测运动场,广泛应用于目标跟踪、动作识别、视频稳定等领域。 光流的计算传统上依赖 CPU 或 GP…...
C语言学习day23:WriteProcessMemory函数/游戏内存数据修改工具开发
简言: 上一章我们说了获取应用进程的某数据(data),这一章我们就说说修改内存地址的数据。想要修改内存,那么就需要我们另一个WinAPI函数:WriteProcessMemory()函数。 WriteProcessMemory()函数 函数原型…...
利用 html_table 函数轻松获取网页中的表格数据
背景/引言 在数据爬取的过程中,网页表格数据往往是研究人员和开发者的重要目标之一。无论是统计分析、商业调研还是信息整理,表格数据的结构化特性都使其具有较高的利用价值。然而,如何快速、准确地从网页中提取表格数据始终是爬虫技术的一个…...
Postman接口测试:全局变量/接口关联/加密/解密
🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 全局变量和环境变量 全局变量:在postman全局生效的变量,全局唯一 环境变量:在特定环境下生效的变量,本环境内唯一 …...
手机银行模拟器,一款高仿真银行app的模拟器,可以修改姓名 卡号 余额 做转账记录 做流水
📱手机银行模拟器让你自由定制你的金融生活。无论是流水账单、金额,还是个人信息,一切都可以按照你的意愿来模拟修改,让你体验模拟器带来的快乐! 链接:https://pan.quark.cn/s/c2f614f3447f 提取码&#…...
HT7183:16V, 4.5A的DC-DC升压转换器,常用在数码相机里
HT7183描述: HT7183是一款高功率异步升压转换器,集成120mΩ功率开关管,为便携式系统提供高效的小尺寸解决方案。具有2.6V至5.5V输入电压范围,可为各类不同供电的应用提供支持。该器件具备3A开关电流能力,并且能够提供高…...
Cobalt Strike 4.8 用户指南-第十四节 Aggressor 脚本
14.1、什么是Aggressor脚本 Aggressor Script 是Cobalt Strike 3.0版及更高版本中内置的脚本语言。Aggressor 脚本允许你修改和扩展 Cobalt Strike 客户端。 历史 Aggressor Script 是 Armitage 中开源脚本引擎Cortana的精神继承者。Cortana 是通过与 DARPA 的网络快速跟踪计…...
【Qt】QWidget中的常见属性及其功能(二)
目录 六、windowOpacity 例子: 七、cursor 例子: 八、font 九、toolTip 例子: 十、focusPolicy 例子: 十一、styleSheet 计算机中的颜色表示 例子: 六、windowOpacity opacity是不透明度的意思。 用于设…...