- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
Future 接口可以创建出新的线程,并在新线程中执行异步操作。但是, 如果使用 Future 接口创建了多个线程,那么这些线程各自独立执行,不存在任务依赖关系,并且无法控制各个线程的执行步骤。
为了解决这种线程间的依赖关系,从 JDK 1.8 开始,Future 接口提供了一个新的实现类 CompletableFuture。CompletableFuture 不但可以创建出异步执行的线程,还可以控制线程的执行步骤,并且可以监控所有线程的结束时刻。例如,可以使用 CompletableFuture 创建 A 和 B 两个线程,然后规定 A 和 B 线程各需要执行 3个 不同的阶段,并且当 A和 B 全部执行完毕后,再触发某个方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
}
其中,supplyAsync() 用于有返回值的任务,runAsync() 用于没有返回值的任务,
除了这些用于执行线程任务的方法外,CompletableFuture 还提供了多线程执行时的两种结束逻辑:allOf() 和 anyOf(),如下所述。
1 allOf() 方法会一直阻塞,直到线程池 Executor 中所有线程全部执行完毕。
2 anyOf() 方法会一直阻塞,直到线程池 Executor 中任何一个线程执行完毕。
有1、2、3、4 四个数字,现要求创建4个线程对这些数字进行处理,并要求每个线程必须按照以下步骤执行。
a 对4个数字的值各加上64,使之转为 A、B、C、D 对应的 ASCII(A:65、B:66、C:67、D:68)。
b 将 ASCII 值转为对应的 A、B、C、D 字符。
c 将转后的 A、B、C、D 加入一个集合中。
最后,还要求当 A、B、C、D 4个线程全部执行完毕后,打印出转换后的结果集。
package concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
public static void main(String[] args) {
// 原始数据集
CopyOnWriteArrayList<Integer> taskList = new CopyOnWriteArrayList();
taskList.add(1);
taskList.add(2);
taskList.add(3);
taskList.add(4);
// 结果集
List<Character> resultList = new ArrayList<>();
//线程池,可容纳四个线程
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture[] cfs = taskList.stream()
// 第一阶段
.map(integer -> CompletableFuture.supplyAsync(
() -> calcASCII(integer), executorService)
// 第二阶段
.thenApply(i -> {
char c = (char) (i.intValue());
System.out.println("【阶段2】线程"
+ Thread.currentThread().getName() + "执行完毕,"
+ "已将int"
+ i + "转为了字符" + c);
return c;
})
// 第三阶段
.whenComplete((ch, e) -> {
resultList.add(ch);
System.out.println("【阶段3】线程" +
Thread.currentThread().getName() + "执行完毕," + "已将"
+ ch + "增加到了结果集" + resultList + "中");
executorService.shutdown();
})
).toArray(CompletableFuture[]::new);
// 封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();//future.get()
System.out.println("完成!result=" + resultList);
}
// 计算i的ASCII值
public static Integer calcASCII(Integer i) {
try {
if (i == 1) {
Thread.sleep(5000);
} else {
Thread.sleep(1000);
}
//数字 -> A-D对应的ascii
i = i + 64;
System.out.println("【阶段1】线程" + Thread.currentThread().getName()
+ "执行完毕," + "已将" + i
+ "转为了A(或B或C或D)对应的ASCII" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
【阶段1】线程pool-1-thread-3执行完毕,已将67转为了A(或B或C或D)对应的ASCII67
【阶段1】线程pool-1-thread-4执行完毕,已将68转为了A(或B或C或D)对应的ASCII68
【阶段1】线程pool-1-thread-2执行完毕,已将66转为了A(或B或C或D)对应的ASCII66
【阶段2】线程pool-1-thread-3执行完毕,已将int67转为了字符C
【阶段2】线程pool-1-thread-4执行完毕,已将int68转为了字符D
【阶段2】线程pool-1-thread-2执行完毕,已将int66转为了字符B
【阶段3】线程pool-1-thread-3执行完毕,已将C增加到了结果集[C, D]中
【阶段3】线程pool-1-thread-4执行完毕,已将D增加到了结果集[C, D]中
【阶段3】线程pool-1-thread-2执行完毕,已将B增加到了结果集[C, D, B]中
【阶段1】线程pool-1-thread-1执行完毕,已将65转为了A(或B或C或D)对应的ASCII65
【阶段2】线程pool-1-thread-1执行完毕,已将int65转为了字符A
【阶段3】线程pool-1-thread-1执行完毕,已将A增加到了结果集[C, D, B, A]中
完成!result=[C, D, B, A]
我是一名优秀的程序员,十分优秀!