小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

深入淺出NIO Socket

 gljin_cn 2016-09-09

前言

Java NIO 由以下幾個核心部分組成:

  • Buffer
  • Channel
  • Selector

以前基于net包進行socket編程時,accept方法會一直阻塞,直到有客戶端請求的到來,并返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取并處理后面的請求;當然我們可以把獲取socket和處理socket的過程分開,一個線程負責accept,線程池負責處理請求。

NIO為我們提供了更好的解決方案,采用選擇器(Selector)找出已經(jīng)準備好讀寫的socket,并按順序處理,基于通道(Channel)和緩沖區(qū)(Buffer)來傳輸和保存數(shù)據(jù)。

Buffer和Channel已經(jīng)介紹過深入淺出NIO Channel和Buffer,本文主要介紹NIO的Selector和Socket的實踐以及實現(xiàn)原理。

Selector是什么?

在養(yǎng)雞場,有這一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來。這樣,如果負責人想知道雞場情況,只需要到那個人查詢即可,當然前提是,負責得讓那個人知道需要記錄哪些情況。

Selector的作用相當這個人的工作,每個雞籠相當于一個SocketChannel,單個線程通過Selector可以管理多個SocketChannel。


A Thread uses a Selector to handle 3 Channels

為了實現(xiàn)Selector管理多個SocketChannel,必須將多個具體的SocketChannel對象注冊到Selector對象,并聲明需要監(jiān)聽的事件,目前有4種類型的事件:

  • connect:客戶端連接服務端事件,對應值為SelectionKey.OP_CONNECT(8)
  • accept:服務端接收客戶端連接事件,對應值為SelectionKey.OP_ACCEPT(16)
  • read:讀事件,對應值為SelectionKey.OP_READ(1)
  • write:寫事件,對應值為SelectionKey.OP_WRITE(4)

當SocketChannel有對應的事件發(fā)生時,Selector能夠覺察到并進行相應的處理。

為了更好地理解NIO Socket,先來看一段服務端的示例代碼

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
    int n = selector.select();
    if (n == 0) continue;
    Iterator ite = this.selector.selectedKeys().iterator();
    while(ite.hasNext()){
        SelectionKey key = (SelectionKey)ite.next();
        if (key.isAcceptable()){
            SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
            clntChan.configureBlocking(false);
            //將選擇器注冊到連接到的客戶端信道,
            //并指定該信道key值的屬性為OP_READ,
            //同時為該信道指定關聯(lián)的附件
            clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
        }
        if (key.isReadable()){
            handleRead(key);
        }
        if (key.isWritable() && key.isValid()){
            handleWrite(key);
        }
        if (key.isConnectable()){
            System.out.println("isConnectable = true");
        }
      ite.remove();
    }
}

服務端連接過程
1、創(chuàng)建ServerSocketChannel實例serverSocketChannel,并bind到指定端口。
2、創(chuàng)建Selector實例selector;
3、將serverSocketChannel注冊到selector,并指定事件OP_ACCEPT。
4、while循環(huán)執(zhí)行:
4.1、調(diào)用select方法,該方法會阻塞等待,直到有一個或多個通道準備好了I/O操作或等待超時。
4.2、獲取選取的鍵列表;
4.3、循環(huán)鍵集中的每個鍵:
4.3.a、獲取通道,并從鍵中獲取附件(如果添加了附件);
4.3.b、確定準備就緒的操縱并執(zhí)行,如果是accept操作,將接收的信道設置為非阻塞模式,并注冊到選擇器;
4.3.c、如果需要,修改鍵的興趣操作集;
4.3.d、從已選鍵集中移除鍵

在步驟3中,selector只注冊了serverSocketChannel的OP_ACCEPT事件

  • 如果有客戶端A連接服務,執(zhí)行select方法時,可以通過serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊socketChannel的OP_READ事件。
  • 如果客戶端A發(fā)送數(shù)據(jù),會觸發(fā)read事件,這樣下次輪詢調(diào)用select方法時,就能通過socketChannel讀取數(shù)據(jù),同時在selector上注冊該socketChannel的OP_WRITE事件,實現(xiàn)服務器往客戶端寫數(shù)據(jù)。

NIO Socket實現(xiàn)原理

SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現(xiàn),其中Selector是整個NIO Socket的核心實現(xiàn)。

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

SelectorProvider在windows和linux下有不同的實現(xiàn),provider方法會返回對應的實現(xiàn)。

Selector分析

Selector是如何做到同時管理多個socket?

Selector初始化時,會實例化PollWrapper、SelectionKeyImpl數(shù)組和Pipe。

WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();
    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

pollWrapper用Unsafe類申請一塊物理內(nèi)存,存放注冊時的socket句柄fdVal和event的數(shù)據(jù)結(jié)構(gòu)pollfd,其中pollfd共8位,0~3位保存socket句柄,4~7位保存event。


pollfd

pollWrapper


pollWrapper提供了fdVal和event數(shù)據(jù)的相應操作,如添加操作通過Unsafe的putInt和putShort實現(xiàn)。

void putDescriptor(int i, int fd) {
    pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
    pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}

SelectionKeyImpl保存注冊時的channel、selector、event以及保存在pollWrapper的偏移位置index。

先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何實現(xiàn)的:

public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException {
    synchronized (regLock) {
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}
  1. 如果該channel和selector已經(jīng)注冊過,則直接添加事件和附件。
  2. 否則通過selector實現(xiàn)注冊過程。
protected final SelectionKey register(AbstractSelectableChannel ch,
      int ops,  Object attachment) {
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);
    }
    k.interestOps(ops);
    return k;
}

protected void implRegister(SelectionKeyImpl ski) {
    synchronized (closeLock) {
        if (pollWrapper == null)
            throw new ClosedSelectorException();
        growIfNeeded();
        channelArray[totalChannels] = ski;
        ski.setIndex(totalChannels);
        fdMap.put(ski);
        keys.add(ski);
        pollWrapper.addEntry(totalChannels, ski);
        totalChannels++;
    }
}
  1. 以當前channel和selector為參數(shù),初始化 SelectionKeyImpl 對象selectionKeyImpl ,并添加附件attachment。
  2. 如果當前channel的數(shù)量totalChannels等于SelectionKeyImpl數(shù)組大小,對SelectionKeyImpl數(shù)組和pollWrapper進行擴容操作。
  3. 如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個線程處理selector。
  4. pollWrapper.addEntry將把selectionKeyImpl中的socket句柄添加到對應的pollfd。
  5. k.interestOps(ops)方法最終也會把event添加到對應的pollfd。

所以,不管serverSocketChannel,還是socketChannel,在selector注冊事件后,最終都保存在pollArray中。

接著,再來看看selector中的select是如何實現(xiàn)一次獲取多個有事件發(fā)生的channel的。
底層由selector實現(xiàn)類的doSelect方法實現(xiàn),如下:

 protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        this.timeout = timeout; // set selector timeout
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();
        int updated = updateSelectedKeys();
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket();
        return updated;
    }

其中 subSelector.poll() 是select的核心,由native函數(shù)poll0實現(xiàn),readFds、writeFds 和exceptFds數(shù)組用來保存底層select的結(jié)果,數(shù)組的第一個位置都是存放發(fā)生事件的socket的總數(shù),其余位置存放發(fā)生事件的socket句柄fd。

private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main thread
     return poll0(pollWrapper.pollArrayAddress,
          Math.min(totalChannels, MAX_SELECTABLE_FDS),
             readFds, writeFds, exceptFds, timeout);
}

執(zhí)行 selector.select() ,poll0函數(shù)把指向socket句柄和事件的內(nèi)存地址傳給底層函數(shù)。

  1. 如果之前沒有發(fā)生事件,程序就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間內(nèi)如果沒有事件,也會返回。
  2. 一旦有對應的事件發(fā)生,poll0方法就會返回。
  3. processDeregisterQueue方法會清理那些已經(jīng)cancelled的SelectionKey
  4. updateSelectedKeys方法統(tǒng)計有事件發(fā)生的SelectionKey數(shù)量,并把符合條件發(fā)生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續(xù)使用。

在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實現(xiàn),是基于IO復用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優(yōu)化了Selctor的實現(xiàn),底層使用epoll替換了select/poll。

epoll原理

epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數(shù)以百萬計的socket句柄。

先看看使用c封裝的3個epoll系統(tǒng)調(diào)用:

  • int epoll_create(int size)
    epoll_create建立一個epoll對象。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù),多于這個最大數(shù)時內(nèi)核可不保證效果。
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
    epoll_ctl可以操作epoll_create創(chuàng)建的epoll,如將socket句柄加入到epoll中讓其監(jiān)控,或把epoll正在監(jiān)控的某個socket句柄移出epoll。
  • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
    epoll_wait在調(diào)用時,在給定的timeout時間內(nèi),所監(jiān)控的句柄中有事件發(fā)生時,就返回用戶態(tài)的進程。

大概看看epoll內(nèi)部是怎么實現(xiàn)的:

  1. epoll初始化時,會向內(nèi)核注冊一個文件系統(tǒng),用于存儲被監(jiān)控的句柄文件,調(diào)用epoll_create時,會在這個文件系統(tǒng)中創(chuàng)建一個file節(jié)點。同時epoll會開辟自己的內(nèi)核高速緩存區(qū),以紅黑樹的結(jié)構(gòu)保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用于存儲準備就緒的事件。
  2. 當執(zhí)行epoll_ctl時,除了把socket句柄放到epoll文件系統(tǒng)里file對象對應的紅黑樹上之外,還會給內(nèi)核中斷處理程序注冊一個回調(diào)函數(shù),告訴內(nèi)核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數(shù)據(jù)到了,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后,就把socket插入到就緒鏈表里。
  3. 當epoll_wait調(diào)用時,僅僅觀察就緒鏈表里有沒有數(shù)據(jù),如果有數(shù)據(jù)就返回,否則就sleep,超時時立刻返回。

epoll的兩種工作模式:

  • LT:level-trigger,水平觸發(fā)模式,只要某個socket處于readable/writable狀態(tài),無論什么時候進行epoll_wait都會返回該socket。
  • ET:edge-trigger,邊緣觸發(fā)模式,只有某個socket從unreadable變?yōu)閞eadable或從unwritable變?yōu)閣ritable時,epoll_wait才會返回該socket。

socket讀數(shù)據(jù)


socket讀數(shù)據(jù)


socket寫數(shù)據(jù)


socket寫數(shù)據(jù)

read實現(xiàn)

通過遍歷selector中的SelectionKeyImpl數(shù)組,獲取發(fā)生事件的socketChannel對象,其中保存了對應的socket句柄,實現(xiàn)如下。

public int read(ByteBuffer buf) throws IOException {
    if (buf == null)
        throw new NullPointerException();
    synchronized (readLock) {
        if (!ensureReadOpen())
            return -1;
        int n = 0;
        try {
            begin();
            synchronized (stateLock) {
                if (!isOpen()) {         
                    return 0;
                }
                readerThread = NativeThread.current();
            }
            for (;;) {
                n = IOUtil.read(fd, buf, -1, nd);
                if ((n == IOStatus.INTERRUPTED) && isOpen()) {
                    // The system call was interrupted but the channel
                    // is still open, so retry
                    continue;
                }
                return IOStatus.normalize(n);
            }
        } finally {
            readerCleanup();        // Clear reader thread
            // The end method, which 
            end(n > 0 || (n == IOStatus.UNAVAILABLE));

            // Extra case for socket channels: Asynchronous shutdown
            //
            synchronized (stateLock) {
                if ((n <= 0) && (!isInputOpen))
                    return IOStatus.EOF;
            }
            assert IOStatus.check(n);
        }
    }
}

通過Buffer的方式讀取socket的數(shù)據(jù)。

wakeup實現(xiàn)

public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}

// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
   setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

看來wakeupSinkFd這個變量是為wakeup方法使用的。
其中interruptTriggered為中斷已觸發(fā)標志,當pollWrapper.interrupt()之后,該標志即為true了;因為這個標志,連續(xù)兩次wakeup,只會有一次效果。

為了實現(xiàn)client和server的數(shù)據(jù)交互,Linux下采用管道pipe實現(xiàn),windows下采用兩個socket之間的通信進行實現(xiàn),它們都有這樣的特性:

  1. 都有兩個端,一個 是read端,一個是write端,windows中兩個socket也是read和write的角色。
  2. 當往write端寫入 數(shù)據(jù),則read端即可以收到數(shù)據(jù)。

...以后有什么想到會繼續(xù)補充。

ENDING。
我是占小狼。
在魔都艱苦奮斗,白天是上班族,晚上是知識服務工作者。
讀完我的文章有收獲,記得關注和點贊哦,如果非要打賞,我也是不會拒絕的啦!

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多