小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Java 5.0多線程編程

 lwj888 2007-02-11
Java自1995年面世以來(lái)得到了廣泛得一個(gè)運(yùn)用,但是對(duì)多線程編程的支持Java很長(zhǎng)時(shí)間一直停留在初級(jí)階段。在Java 5.0之前Java里的多線程編程主要是通過(guò)Thread類,Runnable接口,Object對(duì)象中的wait()、 notify()、 notifyAll()等方法和synchronized關(guān)鍵詞來(lái)實(shí)現(xiàn)的。這些工具雖然能在大多數(shù)情況下解決對(duì)共享資源的管理和線程間的調(diào)度,但存在以下幾個(gè)問(wèn)題
1.      過(guò)于原始,拿來(lái)就能用的功能有限,即使是要實(shí)現(xiàn)簡(jiǎn)單的多線程功能也需要編寫大量的代碼。這些工具就像匯編語(yǔ)言一樣難以學(xué)習(xí)和使用,比這更糟糕的是稍有不慎它們還可能被錯(cuò)誤地使用,而且這樣的錯(cuò)誤很難被發(fā)現(xiàn)。
2.      如果使用不當(dāng),會(huì)使程序的運(yùn)行效率大大降低。
3.      為了提高開(kāi)發(fā)效率,簡(jiǎn)化編程,開(kāi)發(fā)人員在做項(xiàng)目的時(shí)候往往需要寫一些共享的工具來(lái)實(shí)現(xiàn)一些普遍適用的功能。但因?yàn)闆](méi)有規(guī)范,相同的工具會(huì)被重復(fù)地開(kāi)發(fā),造成資源浪費(fèi)。
4.      因?yàn)殒i定的功能是通過(guò)Synchronized來(lái)實(shí)現(xiàn)的,這是一種塊結(jié)構(gòu),只能對(duì)代碼中的一段代碼進(jìn)行鎖定,而且鎖定是單一的。如以下代碼所示:
synchronized(lock){
    //執(zhí)行對(duì)共享資源的操作
    ……
}
     一些復(fù)雜的功能就很難被實(shí)現(xiàn)。比如說(shuō)如果程序需要取得lock A和lock B來(lái)進(jìn)行操作1,然后需要取得lock C并且釋放lock A來(lái)進(jìn)行操作2,Java 5.0之前的多線程框架就顯得無(wú)能為力了。
   因?yàn)檫@些問(wèn)題,程序員對(duì)舊的框架一直頗有微詞。這種情況一直到Java 5.0才有較大的改觀,一系列的多線程工具包被納入了標(biāo)準(zhǔn)庫(kù)文件。這些工具包括了一個(gè)新的多線程程序的執(zhí)行框架,使編程人員可方便地協(xié)調(diào)和調(diào)度線程的運(yùn)行,并且新加入了一些高性能的常用的工具,使程序更容易編寫,運(yùn)行效率更高。本文將分類并結(jié)合例子來(lái)介紹這些新加的多線程工具。
   在我們開(kāi)始介紹Java 5.0里的新Concurrent工具前讓我們先來(lái)看一下一個(gè)用舊的多線程工具編寫的程序,這個(gè)程序里有一個(gè)Server線程,它需要啟動(dòng)兩個(gè) Component,Server線程需等到Component線程完畢后再繼續(xù)。相同的功能在Synchronizer一章里用新加的工具 CountDownLatch有相同的實(shí)現(xiàn)。兩個(gè)程序,孰優(yōu)孰劣,哪個(gè)程序更容易編寫,哪個(gè)程序更容易理解,相信大家看過(guò)之后不難得出結(jié)論。
public class ServerThread {
      Object concLock = new Object();
      int count = 2;
public void runTwoThreads() {
      //啟動(dòng)兩個(gè)線程去初始化組件
            new Thread(new ComponentThread1(this)).start();
            new Thread(new ComponentThread1(this)).start();
            // Wait for other thread
while(count != 0) {
                  synchronized(concLock) {
                        try {
                              concLock.wait();
                              System.out.println("Wake up.");
                        } catch (InterruptedException ie) { //處理異常}
                  }
            }
            System.out.println("Server is up.");
      }
      public void callBack() {
synchronized(concLock) {
                  count--;
                  concLock.notifyAll();
            }
      }
      public static void main(String[] args){
            ServerThread server = new ServerThread();
            server.runTwoThreads();
      }
}
 
public class ComponentThread1 implements Runnable {
      private ServerThread server;
      public ComponentThread1(ServerThread server) {
            this.server = server;
      }
public void run() {
      //做組件初始化的工作
            System.out.println("Do component initialization.");
            server.callBack();
      }
}
1:三個(gè)新加的多線程包
   Java 5.0里新加入了三個(gè)多線程包:java.util.concurrent, java.util.concurrent.atomic, java.util.concurrent.locks.
  • java.util.concurrent包含了常用的多線程工具,是新的多線程工具的主體。
  • java.util.concurrent.atomic 包含了不用加鎖情況下就能改變值的原子變量,比如說(shuō)AtomicInteger提供了addAndGet()方法。Add和Get是兩個(gè)不同的操作,為了保證別的線程不干擾,以往的做法是先鎖定共享的變量,然后在鎖定的范圍內(nèi)進(jìn)行兩步操作。但用AtomicInteger.addAndGet()就不用擔(dān)心鎖定的事了,其內(nèi)部實(shí)現(xiàn)保證了這兩步操作是在原子量級(jí)發(fā)生的,不會(huì)被別的線程干擾。
  • java.util.concurrent.locks包包含鎖定的工具。
2:Callable 和 Future接口
   Callable是類似于Runnable的接口,實(shí)現(xiàn)Callable接口的類和實(shí)現(xiàn)Runnable的類都是可被其它線程執(zhí)行的任務(wù)。Callable和Runnable有幾點(diǎn)不同:
  • Callable規(guī)定的方法是call(),而Runnable規(guī)定的方法是run().
  • Callable的任務(wù)執(zhí)行后可返回值,而Runnable的任務(wù)是不能返回值的。
  • call()方法可拋出異常,而run()方法是不能拋出異常的。
  • 運(yùn)行Callable任務(wù)可拿到一個(gè)Future對(duì)象,通過(guò)Future對(duì)象可了解任務(wù)執(zhí)行情況,可取消任務(wù)的執(zhí)行,還可獲取任務(wù)執(zhí)行的結(jié)果。
以下是Callable的一個(gè)例子:
public class DoCallStuff implements Callable{ // *1
        private int aInt;
        public DoCallStuff(int aInt) {
                this.aInt = aInt;
        }
        public String call() throws Exception { //*2
                boolean resultOk = false;
                if(aInt == 0){
                        resultOk = true;
                } else if(aInt == 1){
                        while(true){ //infinite loop
                                System.out.println("looping....");
                                Thread.sleep(3000);
                        }
                } else {
                        throw new Exception("Callable terminated with Exception!"); //*3
                }
                if(resultOk){
                        return "Task done.";
                } else {
                        return "Task failed";
                }
        }
}
*1: 名為DoCallStuff類實(shí)現(xiàn)了Callable,String將是call方法的返回值類型。例子中用了String,但可以是任何Java類。
*2: call方法的返回值類型為String,這是和類的定義相對(duì)應(yīng)的。并且可以拋出異常。
*3: call方法可以拋出異常,如加重的斜體字所示。
以下是調(diào)用DoCallStuff的主程序。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Executor {
        public static void main(String[] args){
                //*1
                DoCallStuff call1 = new DoCallStuff(0);
                DoCallStuff call2 = new DoCallStuff(1);
                DoCallStuff call3 = new DoCallStuff(2);
                //*2
                ExecutorService es = Executors.newFixedThreadPool(3);
                //*3
                Future future1 = es.submit(call1);
                Future future2 = es.submit(call2);
                Future future3 = es.submit(call3);
                try {
                        //*4
                        System.out.println(future1.get());
                         //*5
                        Thread.sleep(3000);
                        System.out.println("Thread 2 terminated? :" + future2.cancel(true));
                        //*6
                        System.out.println(future3.get());
                } catch (ExecutionException ex) {
                        ex.printStackTrace();
                } catch (InterruptedException ex) {
                        ex.printStackTrace();
                }
        }
}

 

*1: 定義了幾個(gè)任務(wù)
*2: 初始了任務(wù)執(zhí)行工具。任務(wù)的執(zhí)行框架將會(huì)在后面解釋。
*3: 執(zhí)行任務(wù),任務(wù)啟動(dòng)時(shí)返回了一個(gè)Future對(duì)象,如果想得到任務(wù)執(zhí)行的結(jié)果或者是異常可對(duì)這個(gè)Future對(duì)象進(jìn)行操作。Future所含的值必須跟Callable所含的值對(duì)映,比如說(shuō)例子中Future對(duì)印Callable
*4: 任務(wù)1正常執(zhí)行完畢,future1.get()會(huì)返回線程的值
*5: 任務(wù)2在進(jìn)行一個(gè)死循環(huán),調(diào)用future2.cancel(true)來(lái)中止此線程。傳入的參數(shù)標(biāo)明是否可打斷線程,true表明可以打斷。
*6: 任務(wù)3拋出異常,調(diào)用future3.get()時(shí)會(huì)引起異常的拋出。
 運(yùn)行Executor會(huì)有以下運(yùn)行結(jié)果:

looping....
Task done. //*1
looping....
looping....//*2
looping....
looping....
looping....
looping....
Thread 2 terminated? :true //*3
//*4
java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)
        at java.util.concurrent.FutureTask.get(FutureTask.java:80)
        at concurrent.Executor.main(Executor.java:43)
        …….

*1: 任務(wù)1正常結(jié)束
*2: 任務(wù)2是個(gè)死循環(huán),這是它的打印結(jié)果
*3: 指示任務(wù)2被取消
*4: 在執(zhí)行future3.get()時(shí)得到任務(wù)3拋出的異常
3:新的任務(wù)執(zhí)行架構(gòu)
   在Java 5.0之前啟動(dòng)一個(gè)任務(wù)是通過(guò)調(diào)用Thread類的start()方法來(lái)實(shí)現(xiàn)的,任務(wù)的提于交和執(zhí)行是同時(shí)進(jìn)行的,如果你想對(duì)任務(wù)的執(zhí)行進(jìn)行調(diào)度或是控制同時(shí)執(zhí)行的線程數(shù)量就需要額外編寫代碼來(lái)完成。5.0里提供了一個(gè)新的任務(wù)執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務(wù)的執(zhí)行,并且可以建立一個(gè)類似數(shù)據(jù)庫(kù)連接池的線程池來(lái)執(zhí)行任務(wù)。這個(gè)架構(gòu)主要有三個(gè)接口和其相應(yīng)的具體類組成。這三個(gè)接口是Executor, ExecutorService和ScheduledExecutorService,讓我們先用一個(gè)圖來(lái)顯示它們的關(guān)系:
 
  圖的左側(cè)是接口,圖的右側(cè)是這些接口的具體類。注意Executor是沒(méi)有直接具體實(shí)現(xiàn)的。
Executor接口:
是用來(lái)執(zhí)行Runnable任務(wù)的,它只定義一個(gè)方法:
  • execute(Runnable command):執(zhí)行Ruannable類型的任務(wù)
ExecutorService接口:
ExecutorService繼承了Executor的方法,并提供了執(zhí)行Callable任務(wù)和中止任務(wù)執(zhí)行的服務(wù),其定義的方法主要有:
  • submit(task):可用來(lái)提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future對(duì)象
  • invokeAll(collection of tasks):批處理任務(wù)集合,并返回一個(gè)代表這些任務(wù)的Future對(duì)象集合
  • shutdown():在完成已提交的任務(wù)后關(guān)閉服務(wù),不再接受新任務(wù)
  • shutdownNow():停止所有正在執(zhí)行的任務(wù)并關(guān)閉服務(wù)。
  • isTerminated():測(cè)試是否所有任務(wù)都執(zhí)行完畢了。
  • isShutdown():測(cè)試是否該ExecutorService已被關(guān)閉
ScheduledExecutorService接口
在ExecutorService的基礎(chǔ)上,ScheduledExecutorService提供了按時(shí)間安排執(zhí)行任務(wù)的功能,它提供的方法主要有:
  • schedule(task, initDelay): 安排所提交的Callable或Runnable任務(wù)在initDelay指定的時(shí)間后執(zhí)行。
  • scheduleAtFixedRate():安排所提交的Runnable任務(wù)按指定的間隔重復(fù)執(zhí)行
  • scheduleWithFixedDelay():安排所提交的Runnable任務(wù)在每次執(zhí)行完后,等待delay所指定的時(shí)間后重復(fù)執(zhí)行。
代碼:ScheduleExecutorService的例子

public class ScheduledExecutorServiceTest {
        public static void main(String[] args)
               throws InterruptedException, ExecutionException{
               //*1
                ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
                //*2
                Runnable task1 = new Runnable() {
                     public void run() {
                        System.out.println("Task repeating.");
                     }
                };
                //*3
                final ScheduledFuture future1 =
                        service.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
                //*4
                ScheduledFuture future2 = service.schedule(new Callable(){
                     public String call(){
                             future1.cancel(true);
                             return "task cancelled!";
                     }
                }, 5, TimeUnit.SECONDS);
                System.out.println(future2.get());
//*5
service.shutdown();
        }
}

   這個(gè)例子有兩個(gè)任務(wù),第一個(gè)任務(wù)每隔一秒打印一句“Task repeating”,第二個(gè)任務(wù)在5秒鐘后取消第一個(gè)任務(wù)。
*1: 初始化一個(gè)ScheduledExecutorService對(duì)象,這個(gè)對(duì)象的線程池大小為2。
*2: 用內(nèi)函數(shù)的方式定義了一個(gè)Runnable任務(wù)。
*3: 調(diào)用所定義的ScheduledExecutorService對(duì)象來(lái)執(zhí)行任務(wù),任務(wù)每秒執(zhí)行一次。能重復(fù)執(zhí)行的任務(wù)一定是Runnable類型。注意我們可以用TimeUnit來(lái)制定時(shí)間單位,這也是Java 5.0里新的特征,5.0以前的記時(shí)單位是微秒,現(xiàn)在可精確到奈秒。
*4: 調(diào)用ScheduledExecutorService對(duì)象來(lái)執(zhí)行第二個(gè)任務(wù),第二個(gè)任務(wù)所作的就是在5秒鐘后取消第一個(gè)任務(wù)。
*5: 關(guān)閉服務(wù)。
Executors類
   雖然以上提到的接口有其實(shí)現(xiàn)的具體類,但為了方便Java 5.0建議使用Executors的工具類來(lái)得到Executor接口的具體對(duì)象,需要注意的是Executors是一個(gè)類,不是Executor的復(fù)數(shù)形式。Executors提供了以下一些static的方法:
  • callable(Runnable task): 將Runnable的任務(wù)轉(zhuǎn)化成Callable的任務(wù)
  • newSingleThreadExecutor: 產(chǎn)生一個(gè)ExecutorService對(duì)象,這個(gè)對(duì)象只有一個(gè)線程可用來(lái)執(zhí)行任務(wù),若任務(wù)多于一個(gè),任務(wù)將按先后順序執(zhí)行。
  • newCachedThreadPool(): 產(chǎn)生一個(gè)ExecutorService對(duì)象,這個(gè)對(duì)象帶有一個(gè)線程池,線程池的大小會(huì)根據(jù)需要調(diào)整,線程執(zhí)行完任務(wù)后返回線程池,供執(zhí)行下一次任務(wù)使用。
  • newFixedThreadPool(int poolSize):產(chǎn)生一個(gè)ExecutorService對(duì)象,這個(gè)對(duì)象帶有一個(gè)大小為poolSize的線程池,若任務(wù)數(shù)量大于poolSize,任務(wù)會(huì)被放在一個(gè)queue里順序執(zhí)行。
  • newSingleThreadScheduledExecutor:產(chǎn)生一個(gè)ScheduledExecutorService對(duì)象,這個(gè)對(duì)象的線程池大小為1,若任務(wù)多于一個(gè),任務(wù)將按先后順序執(zhí)行。
  • newScheduledThreadPool(int poolSize): 產(chǎn)生一個(gè)ScheduledExecutorService對(duì)象,這個(gè)對(duì)象的線程池大小為poolSize,若任務(wù)數(shù)量大于poolSize,任務(wù)會(huì)在一個(gè)queue里等待執(zhí)行
以下是得到和使用ExecutorService的例子:
代碼:如何調(diào)用Executors來(lái)獲得各種服務(wù)對(duì)象

//Single Threaded ExecutorService
     ExecutorService singleThreadeService = Executors.newSingleThreadExecutor();
//Cached ExecutorService
     ExecutorService cachedService = Executors.newCachedThreadPool();
//Fixed number of ExecutorService
     ExecutorService fixedService = Executors.newFixedThreadPool(3);
//Single ScheduledExecutorService
     ScheduledExecutorService singleScheduledService =
          Executors.newSingleThreadScheduledExecutor();
//Fixed number of ScheduledExecutorService
ScheduledExecutorService fixedScheduledService =
     Executors.newScheduledThreadPool(3);

4:Lockers和Condition接口
   在多線程編程里面一個(gè)重要的概念是鎖定,如果一個(gè)資源是多個(gè)線程共享的,為了保證數(shù)據(jù)的完整性,在進(jìn)行事務(wù)性操作時(shí)需要將共享資源鎖定,這樣可以保證在做事務(wù)性操作時(shí)只有一個(gè)線程能對(duì)資源進(jìn)行操作,從而保證數(shù)據(jù)的完整性。在5.0以前,鎖定的功能是由Synchronized關(guān)鍵字來(lái)實(shí)現(xiàn)的,這樣做存在幾個(gè)問(wèn)題:
  • 每次只能對(duì)一個(gè)對(duì)象進(jìn)行鎖定。若需要鎖定多個(gè)對(duì)象,編程就比較麻煩,一不小心就會(huì)出現(xiàn)死鎖現(xiàn)象。
  • 如果線程因拿不到鎖定而進(jìn)入等待狀況,是沒(méi)有辦法將其打斷的
在Java 5.0里出現(xiàn)兩種鎖的工具可供使用,下圖是這兩個(gè)工具的接口及其實(shí)現(xiàn):
Lock接口
ReentrantLock是Lock的具體類,Lock提供了以下一些方法:
  • lock(): 請(qǐng)求鎖定,如果鎖已被別的線程鎖定,調(diào)用此方法的線程被阻斷進(jìn)入等待狀態(tài)。
  • tryLock():如果鎖沒(méi)被別的線程鎖定,進(jìn)入鎖定狀態(tài),并返回true。若鎖已被鎖定,返回false,不進(jìn)入等待狀態(tài)。此方法還可帶時(shí)間參數(shù),如果鎖在方法執(zhí)行時(shí)已被鎖定,線程將繼續(xù)等待規(guī)定的時(shí)間,若還不行才返回false。
  • unlock():取消鎖定,需要注意的是Lock不會(huì)自動(dòng)取消,編程時(shí)必須手動(dòng)解鎖。
代碼:
//生成一個(gè)鎖
Lock lock = new ReentrantLock();
public void accessProtectedResource() {
 lock.lock(); //取得鎖定
 try {
    //對(duì)共享資源進(jìn)行操作
 } finally {
    //一定記著把鎖取消掉,鎖本身是不會(huì)自動(dòng)解鎖的
    lock.unlock();
 }
}
ReadWriteLock接口
   為了提高效率有些共享資源允許同時(shí)進(jìn)行多個(gè)讀的操作,但只允許一個(gè)寫的操作,比如一個(gè)文件,只要其內(nèi)容不變可以讓多個(gè)線程同時(shí)讀,不必做排他的鎖定,排他的鎖定只有在寫的時(shí)候需要,以保證別的線程不會(huì)看到數(shù)據(jù)不完整的文件。ReadWriteLock可滿足這種需要。ReadWriteLock內(nèi)置兩個(gè) Lock,一個(gè)是讀的Lock,一個(gè)是寫的Lock。多個(gè)線程可同時(shí)得到讀的Lock,但只有一個(gè)線程能得到寫的Lock,而且寫的Lock被鎖定后,任何線程都不能得到Lock。ReadWriteLock提供的方法有:
  • readLock(): 返回一個(gè)讀的lock
  • writeLock(): 返回一個(gè)寫的lock, 此lock是排他的。
ReadWriteLock的例子:
public class FileOperator{
      //初始化一個(gè)ReadWriteLock
      ReadWriteLock lock = new ReentrantReadWriteLock();
public String read() {
      //得到readLock并鎖定
            Lock readLock = lock.readLock();
            readLock.lock();
            try {
                  //做讀的工作
                  return "Read something";
            } finally {
                 readLock.unlock();
            }
      }
     
public void write(String content) {
      //得到writeLock并鎖定
            Lock writeLock = lock.writeLock();
            writeLock.lock();
            try {
                  //做讀的工作
            } finally {
                 writeLock.unlock();
            }
      }
}
 
   需要注意的是ReadWriteLock提供了一個(gè)高效的鎖定機(jī)理,但最終程序的運(yùn)行效率是和程序的設(shè)計(jì)息息相關(guān)的,比如說(shuō)如果讀的線程和寫的線程同時(shí)在等待,要考慮是先發(fā)放讀的lock還是先發(fā)放寫的lock。如果寫發(fā)生的頻率不高,而且快,可以考慮先給寫的lock。還要考慮的問(wèn)題是如果一個(gè)寫正在等待讀完成,此時(shí)一個(gè)新的讀進(jìn)來(lái),是否要給這個(gè)新的讀發(fā)鎖,如果發(fā)了,可能導(dǎo)致寫的線程等很久。等等此類問(wèn)題在編程時(shí)都要給予充分的考慮。
Condition接口:
   有時(shí)候線程取得lock后需要在一定條件下才能做某些工作,比如說(shuō)經(jīng)典的Producer和Consumer問(wèn)題,Consumer必須在籃子里有蘋果的時(shí)候才能吃蘋果,否則它必須暫時(shí)放棄對(duì)籃子的鎖定,等到Producer往籃子里放了蘋果后再去拿來(lái)吃。而Producer必須等到籃子空了才能往里放蘋果,否則它也需要暫時(shí)解鎖等Consumer把蘋果吃了才能往籃子里放蘋果。在Java 5.0以前,這種功能是由Object類的wait(), notify()和notifyAll()等方法實(shí)現(xiàn)的,在5.0里面,這些功能集中到了Condition這個(gè)接口來(lái)實(shí)現(xiàn),Condition提供以下方法:
  • await():使調(diào)用此方法的線程放棄鎖定,進(jìn)入睡眠直到被打斷或被喚醒。
  • signal(): 喚醒一個(gè)等待的線程
  • signalAll():?jiǎn)拘阉械却木€程
Condition的例子:
public class Basket {     
Lock lock = new ReentrantLock();
//產(chǎn)生Condition對(duì)象
     Condition produced = lock.newCondition();
     Condition consumed = lock.newCondition();
     boolean available = false;
     
     public void produce() throws InterruptedException {
           lock.lock();
           try {
                 if(available){
                    consumed.await(); //放棄lock進(jìn)入睡眠 
                 }
                 /*生產(chǎn)蘋果*/
                 System.out.println("Apple produced.");
                 available = true;
                 produced.signal(); //發(fā)信號(hào)喚醒等待這個(gè)Condition的線程
           } finally {
                 lock.unlock();
           }
     }
    
     public void consume() throws InterruptedException {
           lock.lock();
           try {
                 if(!available){
                       produced.await();//放棄lock進(jìn)入睡眠 
                 }
                 /*吃蘋果*/
                 System.out.println("Apple consumed.");
                 available = false;
                 consumed.signal();//發(fā)信號(hào)喚醒等待這個(gè)Condition的線程
           } finally {
                 lock.unlock();
           }
     }     
}
ConditionTester:
public class ConditionTester {
     
      public static void main(String[] args) throws InterruptedException{
final Basket basket = new Basket();
//定義一個(gè)producer
            Runnable producer = new Runnable() {
                  public void run() {
                        try {
                              basket.produce();
                        } catch (InterruptedException ex) {
                              ex.printStackTrace();
                        }
                  }
};
//定義一個(gè)consumer
            Runnable consumer = new Runnable() {
                  public void run() {
                        try {
                              basket.consume();
                        } catch (InterruptedException ex) {
                              ex.printStackTrace();
                        }
                  }
};
//各產(chǎn)生10個(gè)consumer和producer
            ExecutorService service = Executors.newCachedThreadPool();
            for(int i=0; i < 10; i++)
                  service.submit(consumer);
            Thread.sleep(2000);
            for(int i=0; i<10; i++)
                  service.submit(producer);
            service.shutdown();
      }     
}
5: Synchronizer:同步裝置
   Java 5.0里新加了4個(gè)協(xié)調(diào)線程間進(jìn)程的同步裝置,它們分別是Semaphore, CountDownLatch, CyclicBarrier和Exchanger.
Semaphore:
   用來(lái)管理一個(gè)資源池的工具,Semaphore可以看成是個(gè)通行證,線程要想從資源池拿到資源必須先拿到通行證,Semaphore提供的通行證數(shù)量和資源池的大小一致。如果線程暫時(shí)拿不到通行證,線程就會(huì)被阻斷進(jìn)入等待狀態(tài)。以下是一個(gè)例子:
public class Pool {
      ArrayList pool = null;
      Semaphore pass = null;
      public Pool(int size){
            //初始化資源池
            pool = new ArrayList();
            for(int i=0; i
                  pool.add("Resource "+i);
            }
            //Semaphore的大小和資源池的大小一致
            pass = new Semaphore(size);
      }
      public String get() throws InterruptedException{
            //獲取通行證,只有得到通行證后才能得到資源
            pass.acquire();
            return getResource();
      }
      public void put(String resource){
            //歸還通行證,并歸還資源
            pass.release();
            releaseResource(resource);
      }
     private synchronized String getResource() {
            String result = pool.get(0);
            pool.remove(0);
            System.out.println("Give out "+result);
            return result;
      }
      private synchronized void releaseResource(String resource) {
            System.out.println("return "+resource);
            pool.add(resource);
      }
}
SemaphoreTest:
public class SemaphoreTest {
      public static void main(String[] args){
            final Pool aPool = new Pool(2);
            Runnable worker = new Runnable() {
                  public void run() {
                        String resource = null;
                        try {
                              //取得resource
                              resource = aPool.get();
                        } catch (InterruptedException ex) {
                              ex.printStackTrace();
                        }
                        //用resource做工作
                        System.out.println("I worked on "+resource);
                        //歸還resource
                        aPool.put(resource);
                  }
            };
            ExecutorService service = Executors.newCachedThreadPool();
            for(int i=0; i<20; i++){
                  service.submit(worker);
            }
            service.shutdown();
      }    
}

CountDownLatch:
   CountDownLatch是個(gè)計(jì)數(shù)器,它有一個(gè)初始數(shù),等待這個(gè)計(jì)數(shù)器的線程必須等到計(jì)數(shù)器倒數(shù)到零時(shí)才可繼續(xù)。比如說(shuō)一個(gè)Server啟動(dòng)時(shí)需要初始化4個(gè)部件,Server可以同時(shí)啟動(dòng)4個(gè)線程去初始化這4個(gè)部件,然后調(diào)用CountDownLatch(4).await()阻斷進(jìn)入等待,每個(gè)線程完成任務(wù)后會(huì)調(diào)用一次CountDownLatch.countDown()來(lái)倒計(jì)數(shù), 當(dāng)4個(gè)線程都結(jié)束時(shí)CountDownLatch的計(jì)數(shù)就會(huì)降低為0,此時(shí)Server就會(huì)被喚醒繼續(xù)下一步操作。CountDownLatch的方法主要有:
  • await():使調(diào)用此方法的線程阻斷進(jìn)入等待
  • countDown(): 倒計(jì)數(shù),將計(jì)數(shù)值減1
  • getCount(): 得到當(dāng)前的計(jì)數(shù)值
   CountDownLatch的例子:一個(gè)server調(diào)了三個(gè)ComponentThread分別去啟動(dòng)三個(gè)組件,然后server等到組件都啟動(dòng)了再繼續(xù)。
public class Server {
      public static void main(String[] args) throws InterruptedException{
            System.out.println("Server is starting.");
            //初始化一個(gè)初始值為3的CountDownLatch
            CountDownLatch latch = new CountDownLatch(3);
            //起3個(gè)線程分別去啟動(dòng)3個(gè)組件
            ExecutorService service = Executors.newCachedThreadPool();
            service.submit(new ComponentThread(latch, 1));
            service.submit(new ComponentThread(latch, 2));
            service.submit(new ComponentThread(latch, 3));
            service.shutdown();
            //進(jìn)入等待狀態(tài)
            latch.await();
            //當(dāng)所需的三個(gè)組件都完成時(shí),Server就可繼續(xù)了
            System.out.println("Server is up!");
      }
}
 
public class ComponentThread implements Runnable{
      CountDownLatch latch;
      int ID;
      /** Creates a new instance of ComponentThread */
      public ComponentThread(CountDownLatch latch, int ID) {
            this.latch = latch;
            this.ID = ID;
      }
      public void run() {
            System.out.println("Component "+ID + " initialized!");
            //將計(jì)數(shù)減一
            latch.countDown();
      }    
}
運(yùn)行結(jié)果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
CyclicBarrier:
   CyclicBarrier類似于CountDownLatch也是個(gè)計(jì)數(shù)器,不同的是CyclicBarrier數(shù)的是調(diào)用了 CyclicBarrier.await()進(jìn)入等待的線程數(shù),當(dāng)線程數(shù)達(dá)到了CyclicBarrier初始時(shí)規(guī)定的數(shù)目時(shí),所有進(jìn)入等待狀態(tài)的線程被喚醒并繼續(xù)。CyclicBarrier就象它名字的意思一樣,可看成是個(gè)障礙,所有的線程必須到齊后才能一起通過(guò)這個(gè)障礙。CyclicBarrier 初始時(shí)還可帶一個(gè)Runnable的參數(shù),此Runnable任務(wù)在CyclicBarrier的數(shù)目達(dá)到后,所有其它線程被喚醒前被執(zhí)行。
CyclicBarrier提供以下幾個(gè)方法:
  • await():進(jìn)入等待
  • getParties():返回此barrier需要的線程數(shù)
  • reset():將此barrier重置
   以下是使用CyclicBarrier的一個(gè)例子:兩個(gè)線程分別在一個(gè)數(shù)組里放一個(gè)數(shù),當(dāng)這兩個(gè)線程都結(jié)束后,主線程算出數(shù)組里的數(shù)的和(這個(gè)例子比較無(wú)聊,我沒(méi)有想到更合適的例子)
public class MainThread {
public static void main(String[] args)
      throws InterruptedException, BrokenBarrierException, TimeoutException{
            final int[] array = new int[2];
            CyclicBarrier barrier = new CyclicBarrier(2,
                  new Runnable() {//在所有線程都到達(dá)Barrier時(shí)執(zhí)行
                  public void run() {
                        System.out.println("Total is:"+(array[0]+array[1]));
                  }
            });           
            //啟動(dòng)線程
            new Thread(new ComponentThread(barrier, array, 0)).start();
            new Thread(new ComponentThread(barrier, array, 1)).start();   
      }     
}
 
public class ComponentThread implements Runnable{
      CyclicBarrier barrier;
      int ID;
      int[] array;
      public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
            this.barrier = barrier;
            this.ID = ID;
            this.array = array;
      }
      public void run() {
            try {
                  array[ID] = new Random().nextInt();
                  System.out.println(ID+ " generates:"+array[ID]);
                  //該線程完成了任務(wù)等在Barrier處
                  barrier.await();
            } catch (BrokenBarrierException ex) {
                  ex.printStackTrace();
            } catch (InterruptedException ex) {
                  ex.printStackTrace();
            }
      }
}
Exchanger:
   顧名思義Exchanger讓兩個(gè)線程可以互換信息。用一個(gè)例子來(lái)解釋比較容易。例子中服務(wù)生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過(guò)Exchanger雙方互換杯子,服務(wù)生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復(fù)始。
class FillAndEmpty {
      //初始化一個(gè)Exchanger,并規(guī)定可交換的信息類型是DataCup
      Exchanger exchanger = new Exchanger();
      Cup initialEmptyCup = ...; //初始化一個(gè)空的杯子
      Cup initialFullCup = ...; //初始化一個(gè)裝滿水的杯子
      //服務(wù)生線程
      class Waiter implements Runnable {
            public void run() {
                  Cup currentCup = initialEmptyCup;
                  try {
                        //往空的杯子里加水
                        currentCup.addWater();
                        //杯子滿后和顧客的空杯子交換
                        currentCup = exchanger.exchange(currentCup);
                  } catch (InterruptedException ex) { ... handle ... }
            }
      }
      //顧客線程
      class Customer implements Runnable {
            public void run() {
                  DataCup currentCup = initialFullCup;
                  try {
                        //把杯子里的水喝掉
                        currentCup.drinkFromCup();
                        //將空杯子和服務(wù)生的滿杯子交換
                        currentCup = exchanger.exchange(currentCup);
                  } catch (InterruptedException ex) { ... handle ...}
            }
      }
     
      void start() {
            new Thread(new Waiter()).start();
            new Thread(new Customer()).start();
      }
}
6: BlockingQueue接口
   BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,從BlockingQueue取東西的操作將會(huì)被阻斷進(jìn)入等待狀態(tài)直到BlocingkQueue進(jìn)了新貨才會(huì)被喚醒。同樣,如果BlockingQueue是滿的任何試圖往里存東西的操作也會(huì)被阻斷進(jìn)入等待狀態(tài),直到BlockingQueue里有新的空間才會(huì)被喚醒繼續(xù)操作。BlockingQueue提供的方法主要有:
  • add(anObject): 把a(bǔ)nObject加到BlockingQueue里,如果BlockingQueue可以容納返回true,否則拋出IllegalStateException異常。
  • offer(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockingQueue可以容納返回true,否則返回false。
  • put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockingQueue沒(méi)有空間,調(diào)用此方法的線程被阻斷直到BlockingQueue里有新的空間再繼續(xù)。
  • poll(time):取出BlockingQueue里排在首位的對(duì)象,若不能立即取出可等time參數(shù)規(guī)定的時(shí)間。取不到時(shí)返回null。
  • take():取出BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的對(duì)象被加入為止。
根據(jù)不同的需要BlockingQueue有4種具體實(shí)現(xiàn):
  • ArrayBlockingQueue:規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個(gè)int參數(shù)來(lái)指明其大小。其所含的對(duì)象是以FIFO(先入先出)順序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的 BlockingQueue的大小由Integer.MAX_VALUE來(lái)決定。其所含的對(duì)象是以FIFO(先入先出)順序排序的。 LinkedBlockingQueue和ArrayBlockingQueue比較起來(lái),它們背后所用的數(shù)據(jù)結(jié)構(gòu)不一樣,導(dǎo)致 LinkedBlockingQueue的數(shù)據(jù)吞吐量要大于ArrayBlockingQueue,但在線程數(shù)量很大時(shí)其性能的可預(yù)見(jiàn)性低于 ArrayBlockingQueue。
  • PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對(duì)象的排序不是FIFO,而是依據(jù)對(duì)象的自然排序順序或者是構(gòu)造函數(shù)所帶的Comparator決定的順序。
  • SynchronousQueue:特殊的BlockingQueue,對(duì)其的操作必須是放和取交替完成的。
下面是用BlockingQueue來(lái)實(shí)現(xiàn)Producer和Consumer的例子:
public class BlockingQueueTest {
      static BlockingQueue basket;
      public BlockingQueueTest() {
            //定義了一個(gè)大小為2的BlockingQueue,也可根據(jù)需要用其他的具體類
            basket = new ArrayBlockingQueue(2);
      }
      class Producor implements Runnable {
            public void run() {
                  while(true){
                        try {
                              //放入一個(gè)對(duì)象,若basket滿了,等到basket有位置
                              basket.put("An apple");
                        } catch (InterruptedException ex) {
                              ex.printStackTrace();
                        }
                  }
            }
      }
      class Consumer implements Runnable {
           public void run() {
                  while(true){
                        try {
                              //取出一個(gè)對(duì)象,若basket為空,等到basket有東西為止
                              String result = basket.take();
                        } catch (InterruptedException ex) {
                              ex.printStackTrace();
                        }
                  }
            }           
      }
      public void execute(){
            for(int i=0; i<10; i++){
                  new Thread(new Producor()).start();
                  new Thread(new Consumer()).start();
            }           
      }
      public static void main(String[] args){
            BlockingQueueTest test = new BlockingQueueTest();
            test.execute();
      }     
}
7:Atomics 原子級(jí)變量
   原子量級(jí)的變量,主要的類有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。這些原子量級(jí)的變量主要提供兩個(gè)方法:
  • compareAndSet(expectedValue, newValue): 比較當(dāng)前的值是否等于expectedValue,若等于把當(dāng)前值改成newValue,并返回true。若不等,返回false。
  • getAndSet(newValue): 把當(dāng)前值改為newValue,并返回改變前的值。
   這些原子級(jí)變量利用了現(xiàn)代處理器(CPU)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運(yùn)行效率。
8:Concurrent Collections 共點(diǎn)聚集
   在Java的聚集框架里可以調(diào)用Collections.synchronizeCollection(aCollection)將普通聚集改變成同步聚集,使之可用于多線程的環(huán)境下。 但同步聚集在一個(gè)時(shí)刻只允許一個(gè)線程訪問(wèn)它,其它想同時(shí)訪問(wèn)它的線程會(huì)被阻斷,導(dǎo)致程序運(yùn)行效率不高。Java 5.0里提供了幾個(gè)共點(diǎn)聚集類,它們把以前需要幾步才能完成的操作合成一個(gè)原子量級(jí)的操作,這樣就可讓多個(gè)線程同時(shí)對(duì)聚集進(jìn)行操作,避免了鎖定,從而提高了程序的運(yùn)行效率。Java 5.0目前提供的共點(diǎn)聚集類有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList和CopyOnWriteArraySet.

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多