網(wǎng)站建設(shè)屬于服務(wù)還是貨物推廣普通話繪畫
文章目錄
- 前言
- 1. 并發(fā)(Concurrent) 和 并行(Parallel)
- 1.1 并發(fā)的來源
- 1.2 并發(fā)技術(shù)解決了什么問題
- 2. 并行的來源
- 2.1 并行解決了什么問題
- 3. CompletableFuture 簡介
- 4. CompletableFuture 簡單應(yīng)用
- 5. CompletableFuture 工廠方法的應(yīng)用
- 6. CompletableFuture join() 方法
- 7. 使用 ParallelStream 還是 CompletableFuture
- 8. 使用 CompletableFuture 編排異步任務(wù)
- 9. CompletableFuture 響應(yīng) completion 事件
- 10. CompletableFuture 異常處理
前言
看 《Java8 實戰(zhàn)》后,覺得自己對多線程應(yīng)用還是停留在 JUC 工具類的使用上,忽略了 CompletableFuture 這么強大的工具。本文主要內(nèi)容
- 復(fù)習(xí)并行、并發(fā)的概念。
- 多線程的編程模型
- CompletableFuture 讓多線程編程更加清爽
- 有時間的話,補充 CompletableFuture 的內(nèi)部原理
1. 并發(fā)(Concurrent) 和 并行(Parallel)
Concurrent 和 Parallel 作為形容詞,并列到一起。對應(yīng)Java 的類名/方法名 也有所體現(xiàn):
- ConcurrentHashMap
- parallelStream()
1.1 并發(fā)的來源
在單核CPU的時代,根本不可能真正同時運行一個以上的線程(進程是線程的容器,Linux是把時間片分給線程)。
假設(shè)有網(wǎng)易音樂、Chrome瀏覽器這兩個應(yīng)用需要同時運行,操作系統(tǒng)會 輪流 把這兩個應(yīng)用的任務(wù)放到同一個線程上執(zhí)行。
宏觀上看,CPU把時間片分給了不同應(yīng)用,不同應(yīng)用持有單個線程某一段時間的運行權(quán)力。這就是并發(fā)技術(shù)。
1.2 并發(fā)技術(shù)解決了什么問題
在 web 技術(shù)中,同一時刻請求的接收能力提高了,具體的:
如果有耗時較長的數(shù)據(jù)庫查詢、外部資源請求,一個線程不具有并發(fā)能力則耗時操作會一直阻塞后面的請求。
2. 并行的來源
多核CPU的出現(xiàn)
2.1 并行解決了什么問題
除了壓榨硬件資源從而提高響應(yīng)速度外,還盡可能減少任務(wù)之間的并發(fā)度。因為一個CPU核心管一個任務(wù)的情況下,任務(wù)之間是隔離的,也就是線程安全的。
3. CompletableFuture 簡介
這個類是 Java 8 引入的,用于解決 Futrue
異步編程的局限性:
- Futrue 任務(wù)之間的依賴關(guān)系很難表達
- 等待Futrue集合中的所有任務(wù)都完成
- 應(yīng)對Future的完成事件
“可以說 CompletableFuture 和 Future 的關(guān)系就跟 Stream 和 Collections的關(guān)系一樣”
4. CompletableFuture 簡單應(yīng)用
- 定義一個異步任務(wù)
public Future<Double> getPriceAsync(String product) {// 用于接收異步任務(wù)的響應(yīng)CompletableFuture<Double> futurePrice = new CompletableFuture<>();// 異步任務(wù)new Thread( () -> {try {double price = calculatePrice(product);// 異步任務(wù)完成后通知(帶上返回值)futurePrice.complete(price);} catch (Exception ex) {// 異步任務(wù)有異常,也會通知調(diào)用方futurePrice.completeExceptionally(ex);}}).start();return futurePrice;
}
- 調(diào)用異步任務(wù)
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");doSomething();try {double pricie = futurePrice.get();
} catch (Exception e) {throw new RuntimeException(e);
}
5. CompletableFuture 工廠方法的應(yīng)用
- getPriceAsync 可以用已有的api改寫為:
// 同樣獲得了異步處理、異常處理的能力
public Future<Double> getPriceAsync(String product) {return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
6. CompletableFuture join() 方法
- 書中用了兩個Stream,因為Stream有延時特性,寫在一起的話第一個任務(wù)提交后,會被立即join();
- 立即join的副作用就是,主線程會阻塞等待第一個任務(wù)完成后才繼續(xù)后面操作
- 進而所有線程都變成了順序執(zhí)行
- 所以需要拆成兩個Stream
// 獲取并行運算的任務(wù)列表
List<CompletableFuture<String>> priceFutures =shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product)).collect(toList()); // import 了 Collectors.toList()// 匯總并行運算的計算結(jié)果
List<String> result = priceFutures.stream().map(CompletableFuture::join).collect(toList());
7. 使用 ParallelStream 還是 CompletableFuture
- 計算密集型使用 parallelStream() , 其默認(rèn)的最大并行數(shù)就是 CPU核心數(shù),不用額外維護其他參數(shù)
- IO密集或者等待時間不穩(wěn)定的,使用 CompletableFuture
8. 使用 CompletableFuture 編排異步任務(wù)
- 有依賴關(guān)系
List<CompletableFuture<String>> priceFutures =shops.stream()// 獲取價格 (異步).map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))// 解析報價.map(future -> future.thenApply(Quota::parse))// 為計算折扣價構(gòu)造 future (異步) 【該異步任務(wù)需要等待報告被解析出來】.map(future -> future.thenCompose(quota ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)).collect(toList());
getPrice 和 applyDiscount 都是非阻塞調(diào)用,會比阻塞調(diào)用快一點
- 無依賴關(guān)系
Future<Double> futurePriceInUSD =shops.stream()// 獲取價格 (異步).map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product))// 獲取匯率.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),// 兩個異步任務(wù)整合, 哪個值先獲取到無所謂(price, rate) -> price * rate);
9. CompletableFuture 響應(yīng) completion 事件
CompletableFuture[] futures =shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product)).map(future -> future.thenApply(Quota::parse)).map(future -> future.thenCompose(quota ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))// 【定義事件完成后做什么事】.map(f -> thenAccept(System.out.println)).toArray(size -> new CompletableFuture[size]);// 等待所有子線程執(zhí)行完成
CompletableFuture.allOf(futures).join();
10. CompletableFuture 異常處理
- 引用最早的一個代碼
public Future<Double> getPriceAsync(String product) {// 用于接收異步任務(wù)的響應(yīng)CompletableFuture<Double> futurePrice = new CompletableFuture<>();// 異步任務(wù)new Thread( () -> {try {double price = calculatePrice(product);// 異步任務(wù)完成后通知(帶上返回值)futurePrice.complete(price);} catch (Exception ex) {// 異步任務(wù)有異常,也會通知調(diào)用方futurePrice.completeExceptionally(ex);}}).start();return futurePrice;
}
如果 calculatePrice 拋出異常,即 futurePrice.completeExceptionally(ex) 后,futurePrice 的調(diào)用端也會拋出運行時異常。這個異常處理也會封裝在 CompletableFuture.supplyAsync(() -> calculatePrice(product)); 的api中
- exceptionally
參考這篇文章