小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

深入理解AbstractQueuedSynchronizer(一)

 liang1234_ 2019-01-30

博客鏈接:http://www./2017/03/15/深入理解AbstractQueuedSynchronizer(一)/


AbstractQueuedSynchronizer簡介

AbstractQueuedSynchronizer提供了一個FIFO隊(duì)列,可以看做是一個可以用來實(shí)現(xiàn)鎖以及其他需要同步功能的框架。這里簡稱該類為AQS。AQS的使用依靠繼承來完成,子類通過繼承自AQS并實(shí)現(xiàn)所需的方法來管理同步狀態(tài)。例如ReentrantLock,CountDownLatch等。

本篇文章基于JDK1.8來介紹,該類有許多實(shí)現(xiàn)類:

其中,我們最常用的大概就是ReentrantLock和CountDownLatch了。ReentrantLock提供了對代碼塊的并發(fā)訪問控制,也就是鎖,說是鎖,但其實(shí)并沒有用到關(guān)鍵字synchronized,這么神奇?其實(shí)其內(nèi)部就是基于同步器來實(shí)現(xiàn)的,本文結(jié)合ReentrantLock的使用來分析同步器獨(dú)占鎖的原理。

AQS的兩種功能

從使用上來說,AQS的功能可以分為兩種:獨(dú)占和共享。對于這兩種功能,有一個很常用的類:ReentrantReadWriteLock,其就是通過兩個內(nèi)部類來分別實(shí)現(xiàn)了這兩種功能,提供了讀鎖和寫鎖的功能。但子類實(shí)現(xiàn)時,只能實(shí)現(xiàn)其中的一種功能,即要么是獨(dú)占功能,要么是共享功能。

對于獨(dú)占功能,例如如下代碼:

ReentrantLock lock = new ReentrantLock(); ... public void function(){ lock.lock(); try { // do something... } finally { lock.unlock(); } }

這個很好理解,通過ReentrantLock來保證在lock.lock()之后的代碼在同一時刻只能有一個線程來執(zhí)行,其余的線程將會被阻塞,直到該線程執(zhí)行了lock.unlock()。這就是一個獨(dú)占鎖的功能。

對于共享功能,例如如下代碼:

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
...

public void function(){
    
    lock.readLock().lock();
    try {
        
    // do something...
        
        
    } finally {
        lock.readLock().unlock();
    }
    
}

代碼中的lockReentrantReadWriteLock類的實(shí)例,而lock.readLock()為獲取其中的讀鎖,即共享鎖,使用方式并無差別,但和獨(dú)占鎖是有區(qū)別的:

  • 讀鎖與讀鎖可以共享
  • 讀鎖與寫鎖不可以共享(排他)
  • 寫鎖與寫鎖不可以共享(排他)

AQS獨(dú)占鎖的內(nèi)部實(shí)現(xiàn)

AQS的主要數(shù)據(jù)結(jié)構(gòu)

由于使用AQS可以實(shí)現(xiàn)鎖的功能,那么下面就要分析一下究竟是如何實(shí)現(xiàn)的。

AQS內(nèi)部維護(hù)著一個FIFO的隊(duì)列,該隊(duì)列就是用來實(shí)現(xiàn)線程的并發(fā)訪問控制。隊(duì)列中的元素是一個Node類型的節(jié)點(diǎn),Node的主要屬性如下:

static final class Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread; }
  • waitStatus:表示節(jié)點(diǎn)的狀態(tài),其中包含的狀態(tài)有:
    • CANCELLED:值為1,表示當(dāng)前節(jié)點(diǎn)被取消;
    • SIGNAL:值為-1,表示當(dāng)前節(jié)點(diǎn)的的后繼節(jié)點(diǎn)將要或者已經(jīng)被阻塞,在當(dāng)前節(jié)點(diǎn)釋放的時候需要unpark后繼節(jié)點(diǎn);
    • CONDITION:值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,即在condition隊(duì)列中;
    • PROPAGATE:值為-3,表示releaseShared需要被傳播給后續(xù)節(jié)點(diǎn)(僅在共享模式下使用);
    • 0:無狀態(tài),表示當(dāng)前節(jié)點(diǎn)在隊(duì)列中等待獲取鎖。
  • prev:前繼節(jié)點(diǎn);
  • next:后繼節(jié)點(diǎn);
  • nextWaiter:存儲condition隊(duì)列中的后繼節(jié)點(diǎn);
  • thread:當(dāng)前線程。

其中,隊(duì)列里還有一個head節(jié)點(diǎn)和一個tail節(jié)點(diǎn),分別表示頭結(jié)點(diǎn)和尾節(jié)點(diǎn),其中頭結(jié)點(diǎn)不存儲Thread,僅保存next結(jié)點(diǎn)的引用。

AQS中有一個state變量,該變量對不同的子類實(shí)現(xiàn)具有不同的意義,對ReentrantLock來說,它表示加鎖的狀態(tài):

  • 無鎖時state=0,有鎖時state>0;
  • 第一次加鎖時,將state設(shè)置為1;
  • 由于ReentrantLock是可重入鎖,所以持有鎖的線程可以多次加鎖,經(jīng)過判斷加鎖線程就是當(dāng)前持有鎖的線程時(即exclusiveOwnerThread==Thread.currentThread()),即可加鎖,每次加鎖都會將state的值 1,state等于幾,就代表當(dāng)前持有鎖的線程加了幾次鎖;
  • 解鎖時每解一次鎖就會將state減1,state減到0后,鎖就被釋放掉,這時其它線程可以加鎖;
  • 當(dāng)持有鎖的線程釋放鎖以后,如果是等待隊(duì)列獲取到了加鎖權(quán)限,則會在等待隊(duì)列頭部取出第一個線程去獲取鎖,獲取鎖的線程會被移出隊(duì)列;

state變量定義如下:

/**
 * The synchronization state.
 */
private volatile int state;

ReentrantLock類的結(jié)構(gòu)

下面通過ReentrantLock的實(shí)現(xiàn)進(jìn)一步分析重入鎖的實(shí)現(xiàn)。

首先看一下lock方法:

public void lock() { sync.lock(); }

該方法調(diào)用了sync實(shí)例的lock方法,這里要說明一下ReentrantLock中的幾個內(nèi)部類:

  • Sync
  • FairSync
  • NonfairSync

對于ReentrantLock,有兩種獲取鎖的模式:公平鎖和非公平鎖。所以對應(yīng)有兩個內(nèi)部類,都繼承自Sync。而Sync繼承自AQS:

本文主要通過公平鎖來介紹,看一下FairSync的定義:

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 獲取state
        int c = getState();
        // state=0表示當(dāng)前隊(duì)列中沒有線程被加鎖
        if (c == 0) {
            /*
             * 首先判斷是否有前繼結(jié)點(diǎn),如果沒有則當(dāng)前隊(duì)列中還沒有其他線程;
             * 設(shè)置狀態(tài)為acquires,即lock方法中寫死的1(這里為什么不直接setState?因?yàn)榭赡芡瑫r有多個線程同時在執(zhí)行到此處,所以用CAS來執(zhí)行);
             * 設(shè)置當(dāng)前線程獨(dú)占鎖。
             */
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        /*
         * 如果state不為0,表示已經(jīng)有線程獨(dú)占鎖了,這時還需要判斷獨(dú)占鎖的線程是否是當(dāng)前的線程,原因是由于ReentrantLock為可重入鎖;
         * 如果獨(dú)占鎖的線程是當(dāng)前線程,則將狀態(tài)加1,并setState;
         * 這里為什么不用compareAndSetState?因?yàn)楠?dú)占鎖的線程已經(jīng)是當(dāng)前線程,不需要通過CAS來設(shè)置。
         */
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c   acquires;
            if (nextc < 0)
                throw new Error('Maximum lock count exceeded');
            setState(nextc);
            return true;
        }
        return false;
    }
}

AQS獲取獨(dú)占鎖的實(shí)現(xiàn)

acquire方法

acquire是AQS中的方法,代碼如下:

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

該方法主要工作如下:

  1. 嘗試獲取獨(dú)占鎖;
  2. 獲取成功則返回,否則執(zhí)行步驟3;
  3. addWaiter方法將當(dāng)前線程封裝成Node對象,并添加到隊(duì)列尾部;
  4. 自旋獲取鎖,并判斷中斷標(biāo)志位。如果中斷標(biāo)志位為true,執(zhí)行步驟5,否則返回;
  5. 設(shè)置線程中斷。

tryAcquire方法

tryAcquire方法在FairSync中已經(jīng)說明,它重寫了AQS中的方法,在AQS中它的定義如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

既然該方法需要子類來實(shí)現(xiàn),為什么不使用abstract來修飾呢?上文中提到過,AQS有兩種功能:獨(dú)占和共享,而且子類只能實(shí)現(xiàn)其一種功能,所以,如果使用abstract來修飾,那么每個子類都需要同時實(shí)現(xiàn)兩種功能的方法,這對子類來說不太友好,所以沒有使用abstract來修飾。

該方法是在ReentrantLock中的FairSync和NonfairSync的兩個內(nèi)部類來實(shí)現(xiàn)的,這里以FairSysc-公平鎖來說明:

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) throw new Error('Maximum lock count exceeded'); setState(nextc); return true; } return false; }

addWaiter方法

看下addWaiter方法的定義:

private Node addWaiter(Node mode) {
    // 根據(jù)當(dāng)前線程創(chuàng)建一個Node對象
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 判斷tail是否為空,如果為空表示隊(duì)列是空的,直接enq
    if (pred != null) {
        node.prev = pred;
        // 這里嘗試CAS來設(shè)置隊(duì)尾,如果成功則將當(dāng)前節(jié)點(diǎn)設(shè)置為tail,否則enq
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

該方法就是根據(jù)當(dāng)前線程創(chuàng)建一個Node,然后添加到隊(duì)列尾部。

enq方法

private Node enq(final Node node) { // 重復(fù)直到成功 for (;;) { Node t = tail; // 如果tail為null,則必須創(chuàng)建一個Node節(jié)點(diǎn)并進(jìn)行初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 嘗試CAS來設(shè)置隊(duì)尾 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

acquireQueued方法

該方法的功能是循環(huán)的嘗試獲取鎖,直到成功為止,最后返回中斷標(biāo)志位。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 中斷標(biāo)志位
        boolean interrupted = false;
        for (;;) {
            // 獲取前繼節(jié)點(diǎn)
            final Node p = node.predecessor();
            // 如果前繼節(jié)點(diǎn)是head,則嘗試獲取
            if (p == head && tryAcquire(arg)) {
                // 設(shè)置head為當(dāng)前節(jié)點(diǎn)(head中不包含thread)
                setHead(node);
                // 清除之前的head
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果p不是head或者獲取鎖失敗,判斷是否需要進(jìn)行park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這里有幾個問題很重要:

  • 什么條件下需要park?
  • 為什么要判斷中斷狀態(tài)?
  • 死循環(huán)不會引起CPU使用率飆升?

下面分別來分析一下。

什么條件下需要park?

看下shouldParkAfterFailedAcquire方法的代碼:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
  • 如果前一個節(jié)點(diǎn)的狀態(tài)是SIGNAL,則需要park;
  • 如果ws > 0,表示已被取消,刪除狀態(tài)是已取消的節(jié)點(diǎn);
  • 其他情況,設(shè)置前繼節(jié)點(diǎn)的狀態(tài)為SIGNAL。

可見,只有在前繼節(jié)點(diǎn)的狀態(tài)是SIGNAL時,需要park。第二種情況稍后會詳細(xì)介紹。

為什么要判斷中斷狀態(tài)?

首先要知道,acquireQueued方法中獲取鎖的方式是死循環(huán),判斷是否中斷是在parkAndCheckInterrupt方法中實(shí)現(xiàn)的,看下該方法的代碼:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

非常簡單,阻塞當(dāng)前線程,然后返回線程的中斷狀態(tài)并復(fù)位中斷狀態(tài)。

注意interrupted()方法的作用,該方法是獲取線程的中斷狀態(tài),并復(fù)位,也就是說,如果當(dāng)前線程是中斷狀態(tài),則第一次調(diào)用該方法獲取的是true,第二次則是false。而isInterrupted()方法則只是返回線程的中斷狀態(tài),不執(zhí)行復(fù)位操作。

如果acquireQueued執(zhí)行完畢,返回中斷狀態(tài),回到acquire方法中,根據(jù)返回的中斷狀態(tài)判斷是否需要執(zhí)行Thread.currentThread().interrupt()。

為什么要多做這一步呢?先判斷中斷狀態(tài),然后復(fù)位,如果之前線程是中斷狀態(tài),再進(jìn)行中斷?

這里就要介紹一下park方法了。park方法是Unsafe類中的方法,與之對應(yīng)的是unpark方法。簡單來說,當(dāng)前線程如果執(zhí)行了park方法,也就是阻塞了當(dāng)前線程,反之,unpark就是喚醒一個線程。

具體的說明請參考http://blog.csdn.net/hengyunabc/article/details/28126139

park與wait的作用類似,但是對中斷狀態(tài)的處理并不相同。如果當(dāng)前線程不是中斷的狀態(tài),park與wait的效果是一樣的;如果一個線程是中斷的狀態(tài),這時執(zhí)行wait方法會報(bào)java.lang.IllegalMonitorStateException,而執(zhí)行park時并不會報(bào)異常,而是直接返回。

所以,知道了這一點(diǎn),就可以知道為什么要進(jìn)行中斷狀態(tài)的復(fù)位了:

  • 如果當(dāng)前線程是非中斷狀態(tài),則在執(zhí)行park時被阻塞,這是返回中斷狀態(tài)是false
  • 如果當(dāng)前線程是中斷狀態(tài),則park方法不起作用,會立即返回,然后parkAndCheckInterrupt方法會獲取中斷的狀態(tài),也就是true,并復(fù)位;
  • 再次執(zhí)行循環(huán)的時候,由于在前一步已經(jīng)把該線程的中斷狀態(tài)進(jìn)行了復(fù)位,則再次調(diào)用park方法時會阻塞。

所以,這里判斷線程中斷的狀態(tài)實(shí)際上是為了不讓循環(huán)一直執(zhí)行,要讓當(dāng)前線程進(jìn)入阻塞的狀態(tài)。想象一下,如果不這樣判斷,前一個線程在獲取鎖之后執(zhí)行了很耗時的操作,那么豈不是要一直執(zhí)行該死循環(huán)?這樣就造成了CPU使用率飆升,這是很嚴(yán)重的后果。

死循環(huán)不會引起CPU使用率飆升?

上面已經(jīng)說明。

cancelAcquire方法

在acquireQueued方法的finally語句塊中,如果在循環(huán)的過程中出現(xiàn)了異常,則執(zhí)行cancelAcquire方法,用于將該節(jié)點(diǎn)標(biāo)記為取消狀態(tài)。該方法代碼如下:

private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 設(shè)置該節(jié)點(diǎn)不再關(guān)聯(lián)任何線程 node.thread = null; // Skip cancelled predecessors // 通過前繼節(jié)點(diǎn)跳過取消狀態(tài)的node Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 獲取過濾后的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn) Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 設(shè)置狀態(tài)為取消狀態(tài) node.waitStatus = Node.CANCELLED; /* * If we are the tail, remove ourselves. * 1.如果當(dāng)前節(jié)點(diǎn)是tail: * 嘗試更新tail節(jié)點(diǎn),設(shè)置tail為pred; * 更新失敗則返回,成功則設(shè)置tail的后繼節(jié)點(diǎn)為null */ if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; /* * 2.如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn): * 判斷當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的狀態(tài)是否是SIGNAL,如果不是則嘗試設(shè)置前繼節(jié)點(diǎn)的狀態(tài)為SIGNAL; * 上面兩個條件如果有一個返回true,則再判斷前繼節(jié)點(diǎn)的thread是否不為空; * 若滿足以上條件,則嘗試設(shè)置當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),也就是相當(dāng)于將當(dāng)前節(jié)點(diǎn)從隊(duì)列中刪除 */ if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 3.如果是head的后繼節(jié)點(diǎn)或者狀態(tài)判斷或設(shè)置失敗,則喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn) unparkSuccessor(node); } node.next = node; // help GC } }

該方法中執(zhí)行的過程有些復(fù)雜,首先是要獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn),如果前繼節(jié)點(diǎn)的狀態(tài)不是取消狀態(tài)(即pred.waitStatus > 0),則向前遍歷隊(duì)列,直到遇到第一個waitStatus <= 0的節(jié)點(diǎn),并把當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn),然后設(shè)置當(dāng)前節(jié)點(diǎn)的狀態(tài)為取消狀態(tài)。

接下來的工作可以分為3種情況:

  • 當(dāng)前節(jié)點(diǎn)是tail;
  • 當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn)(即隊(duì)列的第一個節(jié)點(diǎn),不包括head),也不是tail;
  • 當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn)。

我們依次來分析一下:

當(dāng)前節(jié)點(diǎn)是tail

這種情況很簡單,因?yàn)閠ail是隊(duì)列的最后一個節(jié)點(diǎn),如果該節(jié)點(diǎn)需要取消,則直接把該節(jié)點(diǎn)的前繼節(jié)點(diǎn)的next指向null,也就是把當(dāng)前節(jié)點(diǎn)移除隊(duì)列。出隊(duì)的過程如下:

注意:經(jīng)驗(yàn)證,這里并沒有設(shè)置node的prev為null。

當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn),也不是tail

這里將node的前繼節(jié)點(diǎn)的next指向了node的后繼節(jié)點(diǎn),真正執(zhí)行的代碼就是如下一行:

compareAndSetNext(pred, predNext, next);

當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn)

這里直接unpark后繼節(jié)點(diǎn)的線程,然后將next指向了自己。

這里可能會有疑問,既然要刪除節(jié)點(diǎn),為什么都沒有對prev進(jìn)行操作,而僅僅是修改了next?

要明確的一點(diǎn)是,這里修改指針的操作都是CAS操作,在AQS中所有以compareAndSet開頭的方法都是嘗試更新,并不保證成功,圖中所示的都是執(zhí)行成功的情況。

那么在執(zhí)行cancelAcquire方法時,當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)有可能已經(jīng)執(zhí)行完并移除隊(duì)列了(參見setHead方法),所以在這里只能用CAS來嘗試更新,而就算是嘗試更新,也只能更新next,不能更新prev,因?yàn)閜rev是不確定的,否則有可能會導(dǎo)致整個隊(duì)列的不完整,例如把prev指向一個已經(jīng)移除隊(duì)列的node。

什么時候修改prev呢?其實(shí)prev是由其他線程來修改的?;厝タ聪聅houldParkAfterFailedAcquire方法,該方法有這樣一段代碼:

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

該段代碼的作用就是通過prev遍歷到第一個不是取消狀態(tài)的node,并修改prev。

這里為什么可以更新prev?因?yàn)閟houldParkAfterFailedAcquire方法是在獲取鎖失敗的情況下才能執(zhí)行,因此進(jìn)入該方法時,說明已經(jīng)有線程獲得鎖了,并且在執(zhí)行該方法時,當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)不會變化(因?yàn)橹挥挟?dāng)下一個節(jié)點(diǎn)獲得鎖的時候才會設(shè)置head),所以這里可以更新prev,而且不必用CAS來更新。

AQS釋放獨(dú)占鎖的實(shí)現(xiàn)

釋放通過unlock方法來實(shí)現(xiàn):

public void unlock() { sync.release(1); }

該方法調(diào)用了release方法,release是在AQS中定義的,看下release代碼:

public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        // 釋放成功后unpark后繼節(jié)點(diǎn)的線程
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這里首先嘗試著去釋放鎖,成功了之后要去喚醒后繼節(jié)點(diǎn)的線程,這樣其他的線程才有機(jī)會去執(zhí)行。

tryRelease代碼如下:

protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }

是不是和tryAcquire方法類似?該方法也需要被重寫,在Sync類中的代碼如下:

protected final boolean tryRelease(int releases) {
    // 這里是將鎖的數(shù)量減1
    int c = getState() - releases;
    // 如果釋放的線程和獲取鎖的線程不是同一個,拋出非法監(jiān)視器狀態(tài)異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 由于重入的關(guān)系,不是每次釋放鎖c都等于0,
    // 直到最后一次釋放鎖時,才會把當(dāng)前線程釋放
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 記錄鎖的數(shù)量
    setState(c);
    return free;
}

當(dāng)前線程被釋放之后,需要喚醒下一個節(jié)點(diǎn)的線程,通過unparkSuccessor方法來實(shí)現(xiàn):

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

主要功能就是要喚醒下一個線程,這里s == null || s.waitStatus > 0判斷后繼節(jié)點(diǎn)是否為空或者是否是取消狀態(tài),然后從隊(duì)列尾部向前遍歷找到最前面的一個waitStatus小于0的節(jié)點(diǎn),至于為什么從尾部開始向前遍歷,回想一下cancelAcquire方法的處理過程,cancelAcquire只是設(shè)置了next的變化,沒有設(shè)置prev的變化,在最后有這樣一行代碼:node.next = node,如果這時執(zhí)行了unparkSuccessor方法,并且向后遍歷的話,就成了死循環(huán)了,所以這時只有prev是穩(wěn)定的。

到這里,通過ReentrantLock的lock和unlock來分析AQS獨(dú)占鎖的實(shí)現(xiàn)已經(jīng)基本完成了,但ReentrantLock還有一個非公平鎖NonfairSync。

其實(shí)NonfairSync和FairSync主要就是在獲取鎖的方式上不同,公平鎖是按順序去獲取,而非公平鎖是搶占式的獲取,lock的時候先去嘗試修改state變量,如果搶占成功,則獲取到鎖:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

非公平鎖的tryAcquire方法調(diào)用了nonfairTryAcquire方法:

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) // overflow throw new Error('Maximum lock count exceeded'); setState(nextc); return true; } return false; }

該方法比公平鎖的tryAcquire方法在第二個if判斷中少了一個是否存在前繼節(jié)點(diǎn)判斷,F(xiàn)airSync中的tryAcquire代碼中的這個if語句塊如下:

if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
}

總結(jié)

本文從ReentrantLock出發(fā),比較完整的分析了AQS內(nèi)部獨(dú)占鎖的實(shí)現(xiàn),總體來說實(shí)現(xiàn)的思路很清晰,就是使用了標(biāo)志位 隊(duì)列的方式來處理鎖的狀態(tài),包括鎖的獲取,鎖的競爭以及鎖的釋放。在AQS中,state可以表示鎖的數(shù)量,也可以表示其他狀態(tài),state的含義由子類去定義,自己只是提供了對state的維護(hù)。AQS通過state來實(shí)現(xiàn)線程對資源的訪問控制,而state具體的含義要在子類中定義。

AQS在隊(duì)列的維護(hù)上的實(shí)現(xiàn)比較復(fù)雜,尤其是節(jié)點(diǎn)取消時隊(duì)列的維護(hù),這里并不是通過一個線程來完成的。同時,AQS中大量的使用CAS來實(shí)現(xiàn)更新,這種更新能夠保證狀態(tài)和隊(duì)列的完整性。


    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多