目录

1.CompletableFuture是干什么的?

2.常用的方法runAsync 和 supplyAsync方法

3.计算结果完成时的回调方法 (whenComplete和whenCompleteAsync )

4.thenApply()和thenCompose()

持续更新中。。。。。。

1.CompletableFuture是干什么的?

CompletableFuture是Java 8中新增的异步编程工具,它提供了一种方便的编程模型,用于解决异步编程中的困难和复杂度。使用CompletableFuture,可以将一些异步或并发任务放到后台线程或线程池中执行,同时允许你定义当任务完成时要执行的代码块。

其他异步编程工具(可以帮助开发者更加高效地处理多线程并发情况):

CompletableFuture:CompletableFuture是Java8中新增的异步编程工具,可以极大地简化异步编程的复杂度、提高程序效率和性能,并且支持函数式编程风格,可以使代码更加简洁易读。 ReactiveX:ReactiveX是一种跨平台的异步编程框架,支持多种编程语言和平台,包括Java、JavaScript、C++、Python等,应用广泛。它通过使用Observable和Observer来实现异步编程,具有高效并发、简洁易读、响应式编程等优势。 Akka:Akka是一种基于Actor模型的异步编程框架,主要用于构建高并发、分布式的应用程序,支持多种编程语言,包括Java、Scala等。它通过实现Actor模型来实现异步消息处理、并发管理、容错机制等功能,具有高效、稳定、可扩展、易维护等优点。 Netty:Netty是一种Java异步网络编程框架,可以帮助开发者更加高效地处理TCP、UDP、HTTP等协议的网络编程,支持异步I/O操作、线程池、事件驱动等机制,可以提高程序的并发性和性能。 RxJava:RxJava是一种异步编程库,用于Java虚拟机中支持响应式编程。它支持异步处理、异步I/O、流式处理等特性,可以在处理大量数据的情况下快速响应,并且具有高效、简单、可维护等优点。

这些异步编程工具,各有特点,可以根据实际开发需求和技术选型合理选择使用。在使用时需要注意避免出现死锁和线程安全等问题,保证程序的性能和稳定性。

2.常用的方法runAsync 和 supplyAsync方法

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture runAsync(Runnable runnable)

public static CompletableFuture runAsync(Runnable runnable, Executor executor)

public static CompletableFuture supplyAsync(Supplier supplier)

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

runAsync方法不支持返回值。supplyAsync可以支持返回值。

示例:

public class Test13 {

// public static CompletableFuture runAsync(Runnable runnable)

// public static CompletableFuture runAsync(Runnable runnable, Executor executor)

// public static CompletableFuture supplyAsync(Supplier supplier)

// public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

// runAsync方法不支持返回值。

// supplyAsync可以支持返回值。

public static void main(String[] args) throws InterruptedException, ExecutionException {

runAsyncTest();

System.out.println( supplyAsyncTest());

}

public static void runAsyncTest() throws InterruptedException {

CompletableFuture.runAsync(()->{

System.out.println("=========================start=================");

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("=========================end=================");

});

System.out.println("1111111111111111111111111111");

// Thread.sleep(500);

}

public static String supplyAsyncTest() throws InterruptedException, ExecutionException {

CompletableFuture stringCompletableFuture= CompletableFuture.supplyAsync(()->{

System.out.println("=========================supplyAsyncTest-start=================");

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("=========================supplyAsyncTest-end=================");

return "okokk";

});

System.out.println("222222222222222222222222222222222");

return stringCompletableFuture.get();

}

}

3.计算结果完成时的回调方法 (whenComplete和whenCompleteAsync )

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture whenComplete(BiConsumer action)

public CompletableFuture whenCompleteAsync(BiConsumer action)

public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor)

public CompletableFuture exceptionally(Function fn)

可以看到Action的类型是BiConsumer它可以处理正常的计算结果,或者异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete方法会在原线程中执行回调操作,whenCompleteAsync则会在异步线程中执行回调操作,一般是在公共的ForkJoinPool中。

package com.onlyqi.test07;

import java.util.concurrent.CompletableFuture;

import java.util.function.BiConsumer;

public class Test14 {

//whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

//whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

public static void main(String[] args) throws InterruptedException {

// whenCompleteTest();

whenCompleteAsyncTest();

Thread.sleep(1500);

}

public static void whenCompleteTest() throws InterruptedException {

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

System.out.println("================whenComplete-start======================"+Thread.currentThread());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("================whenComplete-end======================");

return "ojbk";

});

future.whenComplete(new BiConsumer() {

@Override

public void accept(String s, Throwable throwable) {

System.out.println("whenComplete执行完成"+Thread.currentThread());

}

});

System.out.println("11111111111111111111111111111111111111");

}

public static void whenCompleteAsyncTest() throws InterruptedException {

CompletableFuture future = CompletableFuture.runAsync(() -> {

System.out.println("================whenCompleteAsync -start======================"+Thread.currentThread());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("================whenCompleteAsync -end======================");

});

future.whenCompleteAsync(new BiConsumer() {

@Override

public void accept(Void t, Throwable action) {

System.out.println("whenCompleteAsync执行完成"+Thread.currentThread());

}

});

System.out.println("22222222222222222222222222222222222222222222");

}

}

4.thenApply()和thenCompose()

依赖关系:

thenApply():把前面任务的执行结果,交给后面的FunctionthenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。

thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。

将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。 thenApply

thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。

public CompletableFuture thenApply(Function fn)

public CompletableFuture thenApplyAsync(Function fn)

public CompletableFuture thenCompose(Function> fn);

public CompletableFuture thenComposeAsync(Function> fn) ;

 

示例:

package com.onlyqi.test07;

import java.util.Random;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.function.Function;

import java.util.function.Supplier;

public class Test15 {

public static void main(String[] args) {

thenApplyTest();

thenComposeTest();

}

public static void thenApplyTest(){

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

int result = 100;

System.out.println("第一次运算:" + result);

return result;

}).thenApply(number -> {

int result = number * 3;

System.out.println("第二次运算:" + result);

return result;

});

}

public static void thenComposeTest(){

CompletableFuture future = CompletableFuture

.supplyAsync(new Supplier() {

@Override

public Integer get() {

int number = 6;

System.out.println("第一次运算:" + number);

return number;

}

})

.thenCompose(new Function>() {

@Override

public CompletionStage apply(Integer param) {

return CompletableFuture.supplyAsync(new Supplier() {

@Override

public Integer get() {

int number = param * 2;

System.out.println("第二次运算:" + number);

return number;

}

});

}

});

}

}

 

持续更新中。。。。。。

参考文档:

CompletableFuture使用详解(全网看这一篇就行)_代码搬运工阿新的博客-CSDN博客CompletableFuture使用详解_sermonlizhi的博客-CSDN博客CompletableFuture使用详解(全网看这一篇就行)_代码搬运工阿新的博客-CSDN博客

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: