三、NIO
1、NIO三大核心部分:
Channel:通道,可以理解為是鐵路;
Buffer:緩沖區(qū),可以理解為火車(chē);程序不是直接通過(guò)channel讀寫(xiě)數(shù)據(jù),而是通過(guò)buffer。這很好理解,火車(chē)裝著貨物跑到鐵路上,對(duì)應(yīng)了buffer裝著數(shù)據(jù)跑在channel上;
Selector:選擇器,就是上面NIO模型圖中的selector,selector發(fā)現(xiàn)這個(gè)通道有內(nèi)容要讀取,就處理這個(gè)通道,如果這個(gè)通道沒(méi)啥事兒,它不會(huì)阻塞在這里等這個(gè)通道,而是去看別的通道有沒(méi)有內(nèi)容要讀取,如果都沒(méi)有,管理selector的這個(gè)線程還可以去做別的事。
selector、buffer、channel之間的關(guān)系:
每個(gè)channel都會(huì)對(duì)應(yīng)一個(gè)buffer;一個(gè)channel可以理解為就是一個(gè)連接;
一個(gè)selector對(duì)應(yīng)一個(gè)線程;一個(gè)selector對(duì)應(yīng)多個(gè)channel;
程序切換到哪個(gè)channel是由事件決定;
selector會(huì)根據(jù)不同的事件,在各通道上切換;
buffer底層是一個(gè)數(shù)組;
數(shù)據(jù)的讀取和寫(xiě)入是通過(guò)buffer來(lái)完成的;BIO的讀取和寫(xiě)入是通過(guò)輸入輸出流,不能雙向,而buffer是雙向的;
2、buffer:
buffer有四個(gè)重要的屬性:
capacity:容量,該buffer能夠容納的最大數(shù)據(jù)量,緩沖區(qū)創(chuàng)建時(shí)被設(shè)定且不能修改;
limit:緩沖區(qū)當(dāng)前的終點(diǎn),不能對(duì)緩沖區(qū)超出終點(diǎn)的位置進(jìn)行讀寫(xiě),limit是可變的;
position:下一個(gè)要被讀或?qū)懙脑氐乃饕?,每次讀寫(xiě)都會(huì)改變?cè)撝担?/span>
mark:標(biāo)記;
buffer屬性讀取數(shù)據(jù)的時(shí)候可以設(shè)置position和limit,表示從哪兒開(kāi)始讀,讀到哪兒結(jié)束。
3、channel:
channel類(lèi)似BIO的流,但是有些區(qū)別,如下:
通過(guò)buffer,可以同時(shí)進(jìn)行讀寫(xiě),而流只能讀或者寫(xiě);
通道可以實(shí)現(xiàn)異步讀寫(xiě)數(shù)據(jù);
通道可以從緩沖區(qū)讀數(shù)據(jù),也可以寫(xiě)數(shù)據(jù)到緩沖區(qū);
channel是一個(gè)接口,用得比較多的實(shí)現(xiàn)有如下幾個(gè):
FileChannel:對(duì)文件進(jìn)行操作的;
DatagramChannel:通過(guò) UDP 讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)通道;
ServerSocketChannel:類(lèi)似ServerSocket;
SocketChannel:類(lèi)似Socket
---
看幾個(gè)實(shí)操案例:
public class NioFileChannel01 {
public static void main(String[] args) throws IOException {
String str = "帶你去爬山啊";
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 1. 通過(guò)FileOutputStream獲取對(duì)應(yīng)的FileChannel
FileChannel fc = fos.getChannel();
// 2. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 3. 將str放入buffer中
buffer.put(str.getBytes());
// 4. 切換寫(xiě)數(shù)據(jù)模式
buffer.flip();
// 5. 將buffer數(shù)據(jù)寫(xiě)入到通道
fc.write(buffer);
// 6. 關(guān)閉資源
fos.close();
fc.close();
}
}
public class NioFileChannel02 {
public static void main(String[] args) throws IOException {
// 1. 讀取test01.txt文件
File file = new File("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 將file轉(zhuǎn)成FileInputStream
FileInputStream fis = new FileInputStream(file);
// 3. 獲取通道
FileChannel channel = fis.getChannel();
// 4. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
// 5. 將通道數(shù)據(jù)讀到buffer中
channel.read(buffer);
System.out.println(new String(buffer.array()));
// 6. 關(guān)閉資源
fis.close();
channel.close();
}
}
public class NioFileChannel03 {
public static void main(String[] args) throws IOException {
// 1. 讀取源文件
FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 獲取通道
FileChannel sourceChannel = fis.getChannel();
// 3. 加載目標(biāo)文件
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test02.txt");
// 4. 獲取通道
FileChannel targetChannel = fos.getChannel();
// 5. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
// 6. 標(biāo)志位復(fù)位,一定不能漏了這步,否則死循環(huán)
buffer.clear();
// 7. 讀取數(shù)據(jù)
int read = sourceChannel.read(buffer);
if (read == -1) {
break;
}
// 8. 切換到寫(xiě)數(shù)據(jù)模式,并將buffer中的數(shù)據(jù)寫(xiě)入到targetChannel
buffer.flip();
targetChannel.write(buffer);
}
// 9. 關(guān)閉資源
fis.close();
sourceChannel.close();
fos.close();
targetChannel.close();
}
}
public class NioFileChannel04 {
public static void main(String[] args) throws IOException {
// 1. 讀取源文件
FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 獲取通道
FileChannel sourceChannel = fis.getChannel();
// 3. 加載目標(biāo)文件
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test03.txt");
// 4. 獲取通道
FileChannel targetChannel = fos.getChannel();
// 5. 使用transferFrom完成拷貝
targetChannel.transferFrom(sourceChannel, 0, sourceChannel.size());
// 6. 關(guān)閉資源
fis.close();
sourceChannel.close();
fos.close();
targetChannel.close();
}
}
public class NioFileChannel05 {
public static void main(String[] args) throws IOException {
// 1. 加載文件
RandomAccessFile file = new RandomAccessFile("C:\\Users\\14751\\Desktop\\test01.txt", "rw"); // rw表示讀寫(xiě)
// 2. 獲取文件通道
FileChannel channel = file.getChannel();
// 3. 獲取MappedByteBuffer,這三個(gè)參數(shù),第一個(gè)表示讀寫(xiě)模式,第二個(gè)表示直接修改的起始位置,第三個(gè)表示映射到內(nèi)存中的大小
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
// 4. 對(duì)test01.txt進(jìn)行修改
buffer.put(0, (byte)'A'); // 第一個(gè)字符改成A
buffer.put(1, (byte)'B'); // 第二個(gè)字符改成B
// 5. 關(guān)閉資源
file.close();
channel.close();
}
}
/**
* scattering:將數(shù)據(jù)寫(xiě)入到buffer時(shí),可以采用buffer數(shù)組,依次寫(xiě)入
* gathering:從buffer讀數(shù)據(jù)的時(shí)候,可以采用buffer數(shù)組,依次讀取
* @author zhu
*
*/
public class NioFileChannel06 {
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 2. 綁定端口并啟動(dòng)
InetSocketAddress address = new InetSocketAddress(6666);
serverChannel.socket().bind(address);
// 3. 創(chuàng)建buffer數(shù)組
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(5);
buffers[1] = ByteBuffer.allocate(4);
// 4. 等待客戶端連接
SocketChannel channel = serverChannel.accept();
// 5. 循環(huán)讀取
// 假設(shè)客戶端會(huì)發(fā)送8個(gè)字節(jié)
int len = 8;
while (true) {
int read = 0;
while (read < len) {
long byteNum = channel.read(buffers);
read += byteNum;
System.out.println("讀取到的字節(jié)數(shù):" + read);
}
// 6. 切換模式
Arrays.asList(buffers).forEach(buffer -> buffer.flip());
// 7. 將讀取到的數(shù)據(jù)顯示到客戶端
long writeLen = 0;
while (writeLen < len) {
long byteNum = channel.write(buffers);
writeLen += byteNum;
}
// 8. 將所有buffer進(jìn)行clear
Arrays.asList(buffers).forEach(buffer -> buffer.clear());
}
}
}
4、selector:
selector能夠檢測(cè)多個(gè)通道是否有事件要發(fā)生,多個(gè)channel以事件的方式可以注冊(cè)到同一個(gè)selector中。主要工作流程如下:
當(dāng)客戶端連接時(shí),會(huì)通過(guò)severSocket channel得到對(duì)應(yīng)的socketChannel,并且將socketChannel通過(guò)register方法注冊(cè)到selector中,注冊(cè)后返回一個(gè)selectionKey;
selector通過(guò)集合關(guān)聯(lián)這個(gè)selectionKey;
selector通過(guò)select方法進(jìn)行監(jiān)聽(tīng)(select方法是阻塞的,也可以傳入超時(shí)時(shí)間,阻塞指定的時(shí)間,還可以用selectNow方法,這個(gè)就是非阻塞的;NIO的非阻塞也就體現(xiàn)在這里),返回有事件發(fā)生的通道的個(gè)數(shù);
selector可以得到有事件發(fā)生的通道的selectionKey;
通過(guò)selectionKey,就可以得到它對(duì)應(yīng)的通道,然后就可以完成業(yè)務(wù)操作了。
---
看一個(gè)實(shí)操案例:用NIO實(shí)現(xiàn)服務(wù)端和客戶端的通訊:
public class NIOServer {
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建NIOServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 得到Selector對(duì)象
Selector selector = Selector.open();
// 3. 綁定端口,進(jìn)行監(jiān)聽(tīng)
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 4. 設(shè)置為非阻塞
serverSocketChannel.configureBlocking(false);
// 5. 把serverSocketChannel注冊(cè)到selector中,設(shè)置關(guān)心事件為 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 循環(huán)等待客戶端連接
while (true) {
if (selector.select(1000) == 0) { // 沒(méi)有事件
System.out.println("服務(wù)器等待了1秒鐘,沒(méi)有事件發(fā)生");
continue;
} else { // 有事件
// 7. 有事件發(fā)生,就拿到selectionKey的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 8. 通過(guò)selectionKeys得到channel
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 9. 根據(jù)key的不同事件,做對(duì)應(yīng)的處理
if (key.isAcceptable()) { // 如果是OP_ACCEPT連接事件
// 10. 為該客戶端生成一個(gè)SocketChannel并設(shè)置成非阻塞
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 11. 將當(dāng)前socketChannel也注冊(cè)到selector中,關(guān)注事件為OP_READ,并且關(guān)聯(lián)一個(gè)Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) { // 如果是OP_READ讀取事件
// 12. 通過(guò)key得到channel
SocketChannel channel = (SocketChannel) key.channel();
// 13. 獲取到該channel關(guān)聯(lián)的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 14. 將channel中的數(shù)據(jù)讀到buffer中去
channel.read(buffer);
System.out.println("客戶端發(fā)送的數(shù)據(jù):" + new String(buffer.array()));
}
// 15. 移除當(dāng)前的selectionKey,防止重復(fù)操作
keyIterator.remove();
}
}
}
}
}
public class NIOClient {
public static void main(String[] args) throws IOException {
// 1. 設(shè)置ip和端口
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 6666);
// 2. 創(chuàng)建SocketChannel并設(shè)置成非阻塞
SocketChannel socketChannel = SocketChannel.open(address);
socketChannel.configureBlocking(false);
// 3. 連接服務(wù)器
String str = "hello world";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
// 4. 將數(shù)據(jù)寫(xiě)入channel
socketChannel.write(buffer);
System.in.read();
}