小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

ExecutorService的十個使用技巧

 hh3755 2015-06-24
Published: 26 Nov 2014 Category: Java

ExecutorService這個接口從Java 5開始就已經(jīng)存在了。這得追溯到2004年了。這里小小地提醒一下,官方已經(jīng)不再支持Java 5, Java 6了,Java 7在半年后也將停止支持。我之所以會提起ExecutorService這么舊的一個接口是因為,大多數(shù)Java程序員并沒有搞清楚它的工作原理。關(guān)于它可以介紹的有很多,這里我只想分享它的一些較少為人所知的特性以及實踐技巧。本文主要是面向初級程序員的,并沒有過于高深的東西。

  1. 線程命名

這點得反復(fù)強調(diào)。對正在運行的JVM進行線程轉(zhuǎn)儲(thread dump)或者調(diào)試時,線程池默認的命名機制是pool-N-thread-M,這里N是線程池的序號(每新創(chuàng)建一個線程池,這個N都會加一),而M是池里線程的序號。比方說,pool-2-thread-3指的是JVM生命周期中第二個線程池里的第三個線程。參考這里 Executors.defaultThreadFactory()。這樣的名字表述性不佳。由于JDK將命名機制都隱藏在ThreadFactory里面,這使得要正確地命名線程得稍微費點工夫。所幸的是Guava提供了這么一個工具類:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("Orders-%d")
        .setDaemon(true)
        .build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
  1. 根據(jù)上下文切換名字

這是我從高效的jstack:如何對高速運行的服務(wù)器進行調(diào)試一文中學(xué)到的一個技巧。線程名可以隨時進行修改,只要你想這么做的話。這是有一定的意義的,因為線程轉(zhuǎn)儲只能看到類名和方法名,而沒有參數(shù)及本地變量。通過調(diào)整線程名可以保留一些比較關(guān)鍵的上下文信息,這樣排查消息/記錄/查詢等變慢或者出現(xiàn)死鎖的問題時就容易多了。示例:

private void process(String messageId) {
    executorService.submit(() -> {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + messageId);
        try {
            //real logic here...
        } finally {
            currentThread.setName(oldName);
        }
    });
}

在try-finally塊中當(dāng)前線程的名字是Processing-某個消息ID。這對跟蹤系統(tǒng)內(nèi)的消息流會比較有用。

  1. 顯式地安全地關(guān)閉線程

客戶端線程和線程池之間會有一個任務(wù)隊列。當(dāng)程序要關(guān)閉時,你需要注意兩件事情:入隊的這些任務(wù)的情況怎么樣了以及正在運行的這個任務(wù)執(zhí)行得如何了。令人驚訝的是很多開發(fā)人員并沒能正確地或者有意識地去關(guān)閉線程池。正確的方法有兩種:一個是讓所有的入隊任務(wù)都執(zhí)行完畢(shutdown()),再就是舍棄這些任務(wù)(shutdownNow())——這完全取決于你。比如說如果我們提交了N多任務(wù)并且希望等它們都執(zhí)行完后才返回的話,那么就使用shutdown():

private void sendAllEmails(List<String> emails) throws InterruptedException {
    emails.forEach(email ->
            executorService.submit(() ->
                    sendEmail(email)));
    executorService.shutdown();
    final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
    log.debug("All e-mails were sent so far? {}", done);
}

本例中我們發(fā)送了許多電子郵件,每一封郵件都對應(yīng)著線程池中的一個任務(wù)。提交完這些任務(wù)后我們會關(guān)閉線程池,這樣就不會再有新的任務(wù)進來了。然后我們會至少等待一分鐘,直到這些任務(wù)執(zhí)行完。如果1分鐘后還是有的任務(wù)沒執(zhí)行到的話,awaitTermination()便會返回false。但是剩下的任務(wù)還會繼續(xù)執(zhí)行。我知道有些趕時髦的人會這么寫:

emails.parallelStream().forEach(this::sendEmail);

他們覺得我那樣很老套,不過我個人比較喜歡能控制并發(fā)線程的數(shù)量。還有一個優(yōu)雅地關(guān)閉掉線程池的方法就是shutdownNow():

final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());

這么做的話隊列中的所有任務(wù)都會被舍棄并返回。已執(zhí)行的任務(wù)仍會繼續(xù)執(zhí)行。

  1. 謹慎地處理中斷

Future的一個較少提及的特性便是cancelling。這里我就不重復(fù)多說了,可以看下我之前的一篇文章:InterruptedException及線程中斷。

  1. 監(jiān)控隊列長度,確保隊列有界

不當(dāng)?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊列會持續(xù)變大,消耗過多內(nèi)存。而過多的線程又會由于頻繁的上下文切換導(dǎo)致整個系統(tǒng)的速度變緩——殊途而同歸。隊列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請求:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
        0L, TimeUnit.MILLISECONDS,
        queue);

上面的代碼等價于Executors.newFixedThreadPool(n),然而不同的是默認的實現(xiàn)是一個無界的LinkedBlockingQueue。這里我們用的是一個固定100大小的ArrayBlockingQueue。也就是說如果已經(jīng)有100個任務(wù)在隊列中了(還有N個在執(zhí)行中),新的任務(wù)就會被拒絕掉,并拋出RejectedExecutionException異常。由于這里的隊列是在外部聲明的,我們還可以時不時地調(diào)用下它的size()方法來將隊列大小記錄在到日志/JMX/或者你所使用的監(jiān)控系統(tǒng)中。

  1. 別忘了異常處理

下面這段代碼執(zhí)行的結(jié)果是什么?

executorService.submit(() -> {
    System.out.println(1 / 0);
});

我被它坑過無數(shù)回了:它什么也不會輸出。沒有任何的java.lang.ArithmeticException: / by zero的征兆,啥也沒有。線程池會把這個異常吞掉,就像什么也沒發(fā)生過一樣。如果是你自己創(chuàng)建的java.lang.Thread還好,這樣UncaughtExceptionHandler還能起作用。不過如果是線程池的話你就得小心了。如果你提交的是Runnable對象的話(就像上面那個一樣,沒有返回值),你得將整個方法體用try-catch包起來,至少打印一下異常。如果你提交的是Callable的話,得確保你在用get()方法取值的時候重新拋出異常:

final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();

有趣的是Spring框架的@Async為此還弄出了個BUG,參見:SPR-8995以及 SPR-12090

  1. 監(jiān)控隊列中的等待時間

監(jiān)控工作隊列的長度只是一個方面。然而排除故障時查看從提交任務(wù)到實際執(zhí)行之間的時間差就顯得非常重要了。這個時間差越接近0就越好(說明正好線程池中有空閑的線程),否則任務(wù)要入隊的話這個時間就會增加了。再進一步說,如果線程池不是固定線程數(shù)的話,執(zhí)行新的任務(wù)還得新創(chuàng)建一個線程,這個同樣也會消耗一定的時間。為了能更好地監(jiān)控這項指標,可以對ExecutorService做一下封裝:

public class WaitTimeMonitoringExecutorService implements ExecutorService {
 
    private final ExecutorService target;
 
    public WaitTimeMonitoringExecutorService(ExecutorService target) {
        this.target = target;
    }
 
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        final long startTime = System.currentTimeMillis();
        return target.submit(() -> {
                    final long queueDuration = System.currentTimeMillis() - startTime;
                    log.debug("Task {} spent {}ms in queue", task, queueDuration);
                    return task.call();
                }
        );
    }
 
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return submit(() -> {
            task.run();
            return result;
        });
    }
 
    @Override
    public Future<?> submit(Runnable task) {
        return submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                task.run();
                return null;
            }
        });
    }
 
    //...
 
}

這個實現(xiàn)并不完整,不過也能說明大概的意思了。當(dāng)我們將任務(wù)提交給線程池的時候,便立即開始記錄它的時間。一旦這個任務(wù)被取出并開始執(zhí)行時便停止計時。不要被代碼中的startTime和queueDuration這兩個變量搞混了。事實上它們是在兩個不同的線程中進行求值的,通常都會差個毫秒級或者秒級:

Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
  1. 保留客戶端的棧跟蹤信息

近來響應(yīng)式編程受到了不少關(guān)注。 Reactive manifesto, reactive streams, RxJava(僅發(fā)布了1.0版本!),Clojure agents, scala.rx等等。它們都非常不錯,但棧跟蹤信息就完蛋了,它們幾乎是毫無價值的。假設(shè)提交到線程池中的一個任務(wù)出現(xiàn)了異常:

java.lang.NullPointerException: null
    at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
    at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
    at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

可以很容易發(fā)現(xiàn)NPE異常出現(xiàn)在MyTask的76行。但是我們并不知道是誰提交的這個任務(wù),因為棧信息只能看到Thread以及ThreadPoolExecutor。技術(shù)上來講我們當(dāng)然是可以看下代碼,看看是何處創(chuàng)建的MyTask。不過如果沒有線程在這中間的話,我們馬上便能知道是誰提交的任務(wù)。那么如果我們可以保留客戶端代碼(提交任務(wù)的那段代碼)的棧信息呢?這個想法并非我首創(chuàng)的,Hazelcast就將異常從所有者節(jié)點傳播到了客戶端中。下面是一個非常簡單的將客戶端棧信息保留下來以便失敗時查看的例子:

public class ExecutorServiceWithClientTrace implements ExecutorService {
 
    protected final ExecutorService target;
 
    public ExecutorServiceWithClientTrace(ExecutorService target) {
        this.target = target;
    }
 
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }
 
    private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
        return () -> {
            try {
                return task.call();
            } catch (Exception e) {
                log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
                throw e;
            }
        };
    }
 
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
 
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return tasks.stream().map(this::submit).collect(toList());
    }
 
    //...
 
}

這樣一旦失敗的話我們便可以取到完整的棧信息以及提交任務(wù)時所在的線程的名字。跟之前相比我們有了一些更有價值的信息:

Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack trace
    at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
    at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
    at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
    at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
  1. 優(yōu)先使用CompletableFuture

Java 8中引入了更為強大的CompletableFuture。有可能的話盡量使用下它。ExecutorService并沒有擴展以支持這個增強型的接口,因此你得自己動手了。這么寫是不行的了:

final Future<BigDecimal> future = 
    executorService.submit(this::calculate);

你得這樣:

final CompletableFuture<BigDecimal> future = 
    CompletableFuture.supplyAsync(this::calculate, executorService);

CompletableFuture 繼承自Future,因此跟之前的用法一樣。但是使用你接口的人一定會感謝CompletableFuture所提供的這些額外的功能的。

  1. 同步隊列

SynchronousQueue是一個非常有意思的BlockingQueue。它本身甚至都算不上是一個數(shù)據(jù)結(jié)構(gòu)。最好的解釋就是它是一個容量為0的隊列。這里引用下Java文檔中的一段話:

每一個insert操作都需要等待另一個線程的一個對應(yīng)的remove操作,反之亦然。同步隊列內(nèi)部不會有任何空間,甚至連一個位置也沒有。你無法對同步隊列執(zhí)行peek操作,因為僅當(dāng)你要移除一個元素的時候才存在這么個元素;如果沒有別的線程在嘗試移除一個元素你也無法往里面插入元素;你也無法對它進行遍歷,因為它什么都沒有。。。 同步隊列與CSP和Ada中所用到的集結(jié)管道(rendezvous channel)有異曲同工之妙。

它和線程池有什么關(guān)系?你可以試試在ThreadPoolExecutor中用下SynchronousQueue:

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
        0L, TimeUnit.MILLISECONDS,
        queue);

我們創(chuàng)建了一個擁有兩個線程的線程池,以及一個SynchronousQueue。由于SynchronousQueue本質(zhì)上是一個容量為0的隊列,因此這個ExecutorService只有當(dāng)有空閑線程的時候才能接受新的任務(wù)。如果所有的線程都在忙,新的任務(wù)便會馬上被拒絕掉,不會進行等待。這在要么立即執(zhí)行,要么馬上丟棄的后臺執(zhí)行的場景中會非常有用。

終于講完了,希望你能找到一個自己感興趣的特性!

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多