|
作者:肖飛,2011年加入京東,目前在交易平臺,主導(dǎo)交易平臺核心系統(tǒng)的架構(gòu)優(yōu)化和技術(shù)攻關(guān),以及公共技術(shù)組件和平臺的建設(shè)。
龐大復(fù)雜的系統(tǒng)通常會采用服務(wù)化組件來實(shí)現(xiàn)。系統(tǒng)越復(fù)雜,組件之間的依賴和調(diào)用關(guān)系也會越復(fù)雜。對于處于底層的基礎(chǔ)服務(wù),直接和間接的調(diào)用所帶來的流量壓力非常大。處于中間層的聚合型服務(wù),面對的挑戰(zhàn)則是依賴的服務(wù)太多,后端個別服務(wù)的性能延遲就會影響其吞吐量。性能優(yōu)化是我們系統(tǒng)穩(wěn)定性中的重要一環(huán),這其中,調(diào)用所依賴的RPC服務(wù)或后端數(shù)據(jù)是重點(diǎn)之一。
目前,除了傳統(tǒng)JDBC這樣從API到主流驅(qū)動實(shí)現(xiàn)就是阻塞式的類庫之外,其他常用的RPC/HTTP服務(wù)、MQ、Redis、Mongodb、Kafka等系統(tǒng)都提供了成熟的基于NIO的客戶端庫,也有相應(yīng)的異步API。
但是目前交易平臺的大多數(shù)中臺服務(wù)系統(tǒng),還在習(xí)慣性使用著這些庫的同步API,并不能充分的利用CPU,這也給我們帶來了一定的優(yōu)化空間。從16年開始我們在一些核心的但是服務(wù)邏輯相對簡單的系統(tǒng)中使用異步方式來實(shí)現(xiàn),雖然暫時還做不到完全的異步化,但是也取得了比較好的效果。這篇文章雖然更多是一個簡介性質(zhì),但是也涵蓋了我們在異步編程中需要關(guān)注的要點(diǎn)。希望大家能夠習(xí)慣和擁抱異步編程。
一、相關(guān)概念介紹同步(Synchronous)/異步(Asynchronous),通常是指函數(shù)調(diào)用中的消息通信的兩種不同模式。
1、異步和同步的區(qū)別函數(shù)調(diào)用發(fā)生時,消息(參數(shù))從caller傳遞到callee,控制權(quán)(指令執(zhí)行)從caller轉(zhuǎn)移到callee。調(diào)用返回時,控制權(quán)從callee轉(zhuǎn)移到caller。兩者的區(qū)別在于,callee是否需要等待執(zhí)行完成才將控制權(quán)轉(zhuǎn)移給caller。
在RPC這種更復(fù)雜的場景下,本質(zhì)上并沒有不同。
◆ 同步 
1.callee執(zhí)行完成才返回 2.返回值即結(jié)果
◆ 異步 
1.callee不需要執(zhí)行完成就可返回 2.caller要獲取結(jié)果,需要通過輪詢、回調(diào)等機(jī)制
◆ 同步RPC
◆ 異步RPC
可以看到,在異步RPC的場景下,客戶端和服務(wù)端用于處理IO的CPU能得到充分利用,通常只需要遠(yuǎn)低于caller請求數(shù)量的線程就可以了,這就是多路復(fù)用。
2、callee執(zhí)行機(jī)制上圖中callee的background execute通常是采用池化線程來完成的,比如ThreadPoolExecutor或EventLoop1。
3、caller獲取執(zhí)行結(jié)果caller調(diào)用callee時,如果需要獲取執(zhí)行結(jié)果(消息雙向傳遞),或者獲知執(zhí)行是否完成(消息單向傳遞無返回值),在異步模式下,主要依靠下面兩種機(jī)制。
◆ 輪詢(Polling)比如Java的Future就提供了isDone()這種詢問機(jī)制。 1.//Caller.java
2.void call() {
3. FutureVoid> f = callee.asyncCall(param);
4. // do some other things
5. while(true) {
6. if (f.isDone()) break;
7. //do some other things or sleep or timeout
8. }
9.}
或阻塞版本 1.//Caller.java 2.void call() {
3. FutureVoid> f = callee.asyncCall(param);
4. // do some other things
5. f.get(timeout, TimeUnit.SECONDS);
6.}
輪詢的控制邏輯在caller端。
◆ 回調(diào)(Callback)caller設(shè)置一個回調(diào)函數(shù),供callee執(zhí)行完成后調(diào)用這個函數(shù)?;卣{(diào)的控制是反轉(zhuǎn)的,通常由callee端控制。 1.//Caller.java
2.void call() {
3. callee.asyncCall(param, new AsyncHandlerResponseMessage>>() {
4. @Override public void handleResponse(ResponseMessage> response) {
5. msg = response.get();
6. // process the msg...
7. }
8. });
9. // do some other things
10.}
4、異步模式的場景
◆ 阻塞阻塞(Blocking)/非阻塞(Non-Blocking)是用來描述,在等待調(diào)用結(jié)果時caller線程的狀態(tài)。阻塞,通常意味著caller線程不再使用CPU時間,處于可被OS調(diào)度的狀態(tài)(注意與Java線程狀態(tài)2的區(qū)別)。 磁盤IO和網(wǎng)絡(luò)IO是常見的會引起線程阻塞的場景3。受制于底層OS的同步阻塞式IO系統(tǒng)函數(shù),調(diào)用Java OIO(Old blocking IO) API無疑是會阻塞的。對于DiskIO,Java NIO2提供了異步API。對于SocketIO,Java NIO2以及NIO框架Netty,都提供了異步API。 ◆ Linux提供了異步IO系統(tǒng)函數(shù),只能用于DiskIO,還有一些限制4,Java NIO2 AsynchronousFileChannel內(nèi)部仍然使用線程池+阻塞式API的實(shí)現(xiàn)。 ◆ Linux為SocketIO準(zhǔn)備就緒階段提供了非阻塞式API(select/poll/epoll),但是IO執(zhí)行階段仍然是同步阻塞的,因此主流的Java NIO框架的Reactor模式內(nèi)部實(shí)現(xiàn)使用了線程池。
◆ 并行比如需要調(diào)用多個沒有依賴關(guān)系的服務(wù),或者訪問分散在多個存儲分片中的數(shù)據(jù),如果服務(wù)接口或數(shù)據(jù)訪問接口實(shí)現(xiàn)了異步API,那么就很方便實(shí)現(xiàn)并行調(diào)用,減少總體調(diào)用耗時。
◆ 速度不匹配使用中間隊列解偶caller和callee的速度不匹配問題,削峰填谷。
◆ 批量使用中間隊列解偶caller和callee的速度不匹配問題,削峰填谷。
二、異步API的幾種風(fēng)格
1、Callback這個比較傳統(tǒng),比如zookeeper客戶端提供的基于回調(diào)的異步API: 1.try {
2. zookeeper.create(path, data, acl, createMode, new StringCallback() {
3. public void processResult(int rc, String path, Object ctx, String name) {
4. if (rc != KeeperException.Code.OK.intValue()) {
5. // error handle
6. } else {
7. // success process
8. // 如果需要在成功后再發(fā)起基于回調(diào)的異步調(diào)用,會形成callback hell
9. }
10. }
11. }, ctx);
12.} catch( Throwable e) {
13. // error handle
◆ Callback通常是無狀態(tài)的 ◆ 要獲取Callback的計算結(jié)果,通常需要closure ◆ 異常處理比較分散 ◆ 在有多個異步調(diào)用鏈的時候,容易造成Callback hell
2、Future/PromisePromise是callee給caller的憑證,代表未完成但承諾完成(成功或失?。┑慕Y(jié)果。Promise本身是有狀態(tài)的,通常由callee端維護(hù)。其狀態(tài)轉(zhuǎn)移如下(術(shù)語參考Promise/A+和ES6): 
Promise的狀態(tài)只能轉(zhuǎn)移一次,因此如果有callback,那么.then(callback)或.catch(callback)也只被執(zhí)行一次。
JDK5的Future只能用輪詢或者阻塞的方式獲取結(jié)果,caller端處理比較繁瑣。Guava的ListenableFuture,特別是JDK8的CompletableFuture,則是完整實(shí)現(xiàn)了Promise風(fēng)格的異步API。 個人認(rèn)為Promise是更好的Callback,ListenableFuture接口只是比Future多了一個void addListener(Runnable, Executor)方法。 Promise提供了比Callback更易用更清晰的編程模式,尤其是涉及多個異步API的串行調(diào)用(chaining或pipelining )、組合調(diào)用(并行、合并)、異常處理等方面有很大的優(yōu)勢。 引申閱讀 ◆ 這篇文章談到了Future和Promise的細(xì)微區(qū)別、相關(guān)歷史和技術(shù)。 ◆ 這里有一些討論:Aren’t promises just callbacks?,Is there really a fundamental difference between callbacks and Promises?。 ◆ Promise借鑒了函數(shù)式中的一些概念: 從函數(shù)式編程到Promise。 ◆ 這篇文章簡要對比了幾種語言中的Promise框架。
3、ReactiveX其官方網(wǎng)站的介紹 An API for asynchronous programming with observable streams
其關(guān)鍵的概念Observable比較Promise來說: ◆ Promise代表一個異步計算值,而Observable代表著一系列值(stream)。 ◆ Promise的值只能產(chǎn)生一次,而Observable的事件可以不斷產(chǎn)生。因此Rx首先流行在前端UI場景:事件來源多,數(shù)據(jù)變化影響多個UI組件的變更。
Rx的學(xué)習(xí)曲線比Promise要高得多,而且目前Promise風(fēng)格的異步編程能夠滿足我們大部分的服務(wù)端開發(fā)場景,因此我們這里主要關(guān)注Promise。
三、Promise在服務(wù)端的應(yīng)用下面穿插著以JDK8的CompletableFuture和Guava的ListenableFuture(適用JDK6)為例介紹Promise的用法。
1、符合Promise風(fēng)格的方法簽名Promise風(fēng)格的方法簽名,有個不成文的規(guī)則是不拋出異常,因?yàn)楫惓J荘romise對象本身就能攜帶的兩種狀態(tài)之一。比如我們想把一個Callback風(fēng)格的異步API包裝成Promise風(fēng)格的(通常在使用一個較老的類庫時需要這樣的包裝),可以這樣: 1.//caller.java
2.CompletableFutureString> asyncCall(final String msg) {
3. CompletableFutureString> promise = new CompletableFuture<>();
4. try {
5. callee.asyncCall(msg, new CallbackString>() {
6. public void onSuccess(String r) { promise.complete(r); }
7. public void onFail(Throwable t) { promise.completeExceptionally(t); }
8. });
9. } catch (Throwable e) {
10. promise.completeExceptionally(t);
11. }
12. return promise;
13.}
下面是使用ListenableFuture的實(shí)現(xiàn)異步發(fā)送消息的API。 1.//LocalMessageEngine.java 2.static class WriteTask {
3. // SettableFuture是Guava中一種可設(shè)置狀態(tài)的Promise類型。
4. final SettableFutureBoolean> promise = SettableFuture.create();
5. final byte[] message;
6. WriteTask(byte[] message) {
7. this.message = message;
8. }
9.}
10.public Producer createProducer() {
11. return new Producer() {
12. public ListenableFutureBoolean> asyncProduce(byte[] message) {
13. if (!Engine.this.started) {
14. // 返回前已完成
15. return Futures.immediateFailedFuture(new IllegalStateException('Message engine was stopped or not started'));
16. }
17.
18. WriteTask task = new WriteTask(message);
19. boolean queued = writeTaskQueue.offer(task);
20. if (!queued) {
21. task.promise.set(Boolean.FALSE);
22. }
23. return task.promise;
24. }
25. };
26.}
上面兩個例子,描述了如何創(chuàng)建一個Promise對象返回給caller,以及如何在callee端fulfill或reject這個Promise。你可能會發(fā)現(xiàn),返回給caller之前Promise是可以處于完成狀態(tài)的。在繼續(xù)下面的使用介紹前,先簡單的看下ListenableFuture和CompletableFuture的幾個主要API。 
2、串行調(diào)用 ListenableFuture沒有提供then方法,而是通過Futures的一系列靜態(tài)方法來實(shí)現(xiàn)Promise風(fēng)格的API。由于兩者有大部分的API是可以相互轉(zhuǎn)化的,限于篇幅下面就不全部演示了。Promse的callback就是其then或catch方法的函數(shù)型參數(shù),當(dāng)Promise被resolve時執(zhí)行這個callback函數(shù)。這個callback函數(shù)的輸入,就是Promise的resolved值。根據(jù)callback函數(shù)的輸出的不同,需要采取不同的then方法。
◆ callback無輸出Futures.addCallback和CompletableFuture.thenAccept接受無輸出的callback。 1.// Guava
2.ListenableFutureQueryResult> promise1 = ...;
3.Futures.addCallback(promise1, new FutureCallbackQueryResult>() {
4. public void onSuccess(QueryResult result) {
5. storeInLocalCache(result);
6. }
7. public void onFailure(Throwable t) {
8. reportError(t);
9. }
10.});
1.// CompletableFuture
2.CompletableFutureQueryResult> promise1 = ...;
3.CompletableFutureVoid> promise2 = promise1.thenAccept(result -> storeInLocalCache(result));
4.return promise2;
5.//return promise1.thenAccept(result -> storeInLocalCache(result));//thenAccept返回另一個Promise實(shí)例
先忽略異常處理。對比下這種場景下的ListenableFuture和CompletableFuture:前者采取了更傳統(tǒng)的callback風(fēng)格,后者則返回一個新的Promise實(shí)例,callback計算完畢則promise2被fulfilled,很容易通過promise2來獲取callback執(zhí)行完畢與否,不需要closure。
◆ callback輸出一個普通計算值這種情況下callback就是一個轉(zhuǎn)換函數(shù),輸入是前一個Promise的fulfilled值,輸出則作為新Promise的fulfilled值。Futures.transform和CompletableFuture.thenApply接收這樣的callback函數(shù)。 1.// CompletableFuture
2.CompletableFutureQueryResult> queryFuture = ...;
3.CompletableFutureListRow>> rowsFuture = queryFuture.thenApply(result -> result.getRows());
4.return rowsFuture;
◆ callback輸出一個異步計算值,即一個Promise乍一看,這種情況下的輸出跟上一種好像沒什么區(qū)別。但實(shí)際上,輸出一個Promise值和輸出一個普通的值有根本的區(qū)別。還記得吧,Promise代表著一個未完成的并且承諾完成的值。通常這種情況下,意味著callback里調(diào)用了另外一個Promise風(fēng)格的異步API。比如下面的例子中indexService.lookUp和dataService.read方法,由于涉及到IO,都設(shè)計為異步API。 1.//Guava
2.ListenableFutureRowKey> rowKeyFuture = indexService.lookUp(query);
3.AsyncFunctionRowKey, QueryResult> queryFunction = new AsyncFunctionRowKey, QueryResult>() {
4. public ListenableFutureQueryResult> apply(RowKey rowKey) {
5. return dataService.read(rowKey);
6. }
7.};
8.ListenableFutureQueryResult> queryFuture = Futures.transformAsync(rowKeyFuture, queryFunction);
9.return queryFuture;
Futures.transformAsync和CompletableFuture.thenCompose接收這樣的callback函數(shù)。
設(shè)想一下,如果某個邏輯中需要調(diào)用的多個Promise風(fēng)格的異步方法(比如多個RPC調(diào)用),并且有先后依賴關(guān)系,即上一個方法的執(zhí)行結(jié)果作為下一個方法的輸入。就可以用thenCompose把他們串起來。 1.//CompletableFuture
2.CompletableFuture promise4 = rpc1.call(input) //promise1
3. .thenCompose(rpc1Result -> rpc2.call(rpc1Result)) //promise2
4. .thenCompose(rpc2Result -> rpc3.call(rpc2Result)) //promise3
5. .thenCompose(rpc3Result -> rpc4.call(rpc3Result)) //promise4
6.return promise4;
不要被鏈?zhǔn)秸{(diào)用給忽悠了,你還是可以正常使用普通的風(fēng)格。
單純看來,上述的串行調(diào)用場景下使用Promise風(fēng)格的API好像只是消除了Callback hell。那么采用同步API就既沒有Callback hell的問題,又符合數(shù)據(jù)依賴關(guān)系??墒?,你會發(fā)現(xiàn),上面的舉例中結(jié)尾都返回了Promise,就是說,包含這段代碼的方法被設(shè)計為異步API。而使用同步API,則會強(qiáng)制這個方法的調(diào)用者只能使用同步方式調(diào)用。
3、并行調(diào)用異步API很適合并行調(diào)用。caller在調(diào)用多個沒有依賴關(guān)系的異步API時,可以先依次發(fā)起調(diào)用而不用等待每個調(diào)用真正執(zhí)行完成,從callee的角度來講,執(zhí)行是并行的。caller可以對調(diào)用結(jié)果進(jìn)行合并處理,關(guān)鍵是,合并也是異步風(fēng)格的。 1.//Guava
2.ListListenableFutureQueryResult>> partialPromises = new ArrayListListenableFutureQueryResult>>(nodes.size());
3.for (Node node : nodes) {
4. partialPromises.add(lookupHandler(node).query());
5.}
6.ListenableFutureListRow>> mergedPromise = Futures.transform(Futures.allAsList(partialPromises), new FunctionListListRow>>, ListRow>>() {
7. @Override public Long apply(ListListRow>> input) {
8. return merge(input);
9. }
10.})
11.return mergedPromise;
Futures.allAsList是并行執(zhí)行所有的promises,若有一個promise異常完成則嘗試reject尚未resolved的promise。也可以使用Futures.successfulAsList,區(qū)別在于后者并不會reject尚未resolved的promise。CompletableFuture的對應(yīng)物是allOf和anyOf。
4、調(diào)用編排合并結(jié)果設(shè)計為異步風(fēng)格的好處在于,很方便做合并、串行混合調(diào)用編排,比如某個邏輯中需要調(diào)用四個個RPC服務(wù)A、B、C、D,其中:A的輸出作為B、C的輸入,B、C可并行,B、C的輸出合并后作為D的輸入。 1.//CompletableFuture
2.CompletableFutureAResult> promiseA = rpcA.call(input);
3.CompletableFutureDResult> promiseD = promiseA.thenCompose(aResult -> {
4. CompletableFutureBResult> promiseB = rpcB.call(aResult);
5. CompletableFutureCResult> promiseC = rpcC.call(aResult);
6. CompletableFutureMergedResult> mergedPromise = promiseB.thenCombine(promiseC, (bResult, cResult) -> {
7. return merge(bResult, cResult)
8. });
9. return mergedPromise;
10.}).thenCompose(mergedResult -> rpcD.call(mergedResult));
11.return promiseD;
5、異常處理上面提到過Callback風(fēng)格的異步API,異常處理比較分散。而Promise風(fēng)格的異常處理則優(yōu)雅得多。我們需要記住,異常是Promise攜帶的兩種狀態(tài)之一。那么異??梢宰鳛閏allback函數(shù)的輸入。
◆ 通用異常處理Futures.catching和CompletableFuture.exceptionally接收異常值為參數(shù)的callback函數(shù)。 1.//Guava 2.ListenableFutureInteger> fetchCounterPromise = ...;
3.// Falling back to a zero counter in case an exception happens when
4.// processing the RPC to fetch counters.
5.ListenableFutureInteger> faultTolerantPromise = Futures.catching(
6. fetchCounterPromise, FetchException.class,
7. new FunctionFetchException, Integer>() {
8. public Integer apply(FetchException e) {
9. return 0;
10. }
11. });
我們再看一個更復(fù)雜的例子。 1.//CompletableFuture 2.CompletableFutureFaultTolerantResult> faultTolerantPromise = rpc1.call(input) //promise1
3. .thenCompose(rpc1Result -> rpc2.call(rpc1Result)) //promise2
4. .thenCompose(rpc2Result -> rpc3.call(rpc2Result)) //promise3
5. .thenCompose(rpc3Result -> rpc4.call(rpc3Result)) //promise4
6. .exceptionally(err -> {
7. // process err
8. return faultTolerantValue;
9. }); //faultTolerantPromise
10.return faultTolerantPromise;
再提醒一遍,不要被鏈?zhǔn)秸{(diào)用迷惑了。這個例子里面,rpc1/rpc2/rpc3/rpc4都有可能發(fā)生異常,但我們只需要在最后統(tǒng)一處理(返回異常值或轉(zhuǎn)換為一個默認(rèn)正常值)。比如rpc2發(fā)生異常,那么rpc3/rpc4的邏輯(接收正常值的callback函數(shù))都不會執(zhí)行,但是rpc2的異常會傳遞給promise3/promise4。
◆ 恢復(fù)假設(shè)在讀取某個數(shù)據(jù)存儲發(fā)生異常,我們需要某種恢復(fù)機(jī)制,比如讀取另一個backup的數(shù)據(jù)存儲(某種重試),那么可以使用Futures.cachingAsync和CompletableFuture.handle。 1.//CompletableFuture
2.public CompletableFuture dispatch(final Command command) {
3. final CompletableFuture dispatched = loadbalance.selectHandler().dispatch(command);
4. if (maxTries > 0) {
5. final AtomicInteger leftTries = new AtomicInteger(maxTries);
6. final BiFunctionCompletableFuture, Throwable, CompletableFuture> fallback = new BiFunctionCompletableFuture, Throwable, CompletableFuture>() {
7. @Override
8. public CompletableFuture apply(CompletableFuture input, Throwable cause) {
9. if (cause == null) return input;
10. if (cause instanceof RecoverableException && leftTries.getAndDecrement() > 0) {
11. final CompletableFuture next = loadbalance.selectHandler().dispatch(new Command(command.type, command.args));
12. return next.handle((v, err) -> err).thenCompose(err -> apply(next, err));
13. }
14. CompletableFuture errFuture = new CompletableFuture<>();
15. errFuture.completeExceptionally(cause);
16. return errFuture;
17. }
18. };
19. return dispatched.handle((v, err) -> err).thenCompose(err -> fallback.apply(dispatched, err));
20. }
21. return dispatched;
22.}
◆ 超時Promise是一個承諾完成(成功或失敗)的結(jié)果,但是并不承諾完成時間。所以,通常需要一種超時機(jī)制,幸運(yùn)的是ListenableFuture和CompletableFuture都實(shí)現(xiàn)了Future接口。 1.CompletableFutureFaultTolerantResult> faultTolerantPromise = ...;
2.try {
3. FaultTolerantResult result = faultTolerantPromise.get(1000, TimeUnit.SECONDS);
4. // process result
5.} catch (TimeoutException e) {
6. //嘗試取消執(zhí)行尚未開始的callback函數(shù)
7. faultTolerantPromise.cancel(false);
8.}
四、Promise異步編程的注意點(diǎn)異步編程比同步編程困難。異步編程通常主要解決一小部分問題,比如阻塞。Promise借鑒了函數(shù)式編程的風(fēng)格,大量的邏輯會分散在各種callback函數(shù)來實(shí)現(xiàn)。因此對于習(xí)慣了同步編程的OO式或命令式編程風(fēng)格的開發(fā)人員,需要一定的習(xí)慣時間。
上面談到callee執(zhí)行機(jī)制的時候,談到了線程池,那么callee計算完成時,callback函數(shù)的執(zhí)行通常是池中resolve Promise的線程執(zhí)行。但是,如果caller在設(shè)置callback的時候,Promise已經(jīng)完成,那么callback的執(zhí)行線程則是caller線程。因此,請?zhí)貏e關(guān)注callback函數(shù)的執(zhí)行線程的差別。請遵循: ◆ callback盡量輕量 ◆ callback避免阻塞 ◆ 否則請指定執(zhí)行線程
如果callback執(zhí)行了大量的計算,甚至執(zhí)行了阻塞式操作,那么就很有可能阻塞住Promise的resolve線程,通常這類線程都是極少的,比如執(zhí)行IO的EventLoop線程,有可能造成其他Promise得不到執(zhí)行。 摘自ListenableFuture的文檔: Note: For fast, lightweight listeners that would be safe to execute in any thread, consider MoreExecutors.directExecutor. Otherwise, avoid it. Heavyweight directExecutor listeners can cause problems, and these problems can be difficult to reproduce because they depend on timing. For example: ◆ The listener may be executed by the caller of addListener. That caller may be a UI thread or other latency-sensitive thread. This can harm UI responsiveness. ◆ The listener may be executed by the thread that completes this Future. That thread may be an internal system thread such as an RPC network thread. Blocking that thread may stall progress of the whole system. It may even cause a deadlock. ◆ The listener may delay other listeners, even listeners that are not themselves directExecutor listeners.
我們也的確碰到過使用MoreExecutors.directExecutor時,由于編寫了太過復(fù)雜的callback鏈,導(dǎo)致線程死鎖的問題。
CompletableFuture和ListenableFuture都有指定callback執(zhí)行線程的方法: 1.//使用內(nèi)置的通用ForkJoin線程池
2.completableFuture.thenAcceptAsync(callback);
3.//使用指定的線程執(zhí)行器
4.completableFuture.thenAcceptAsync(callback, executor);
5.//使用指定的線程執(zhí)行器
6.Futures.transform(input, callback, executor);
正因?yàn)楫惒骄幊痰膹?fù)雜性,因此目前我們也盡量在業(yè)務(wù)邏輯相對簡單的應(yīng)用上進(jìn)行異步化改造。后續(xù),我們也會評估Quasar等協(xié)程框架。
完~
==============================
|