|
一、io_uring介紹 io_uring是 Linux 于 2019 年加入到內(nèi)核的一種新型異步 I/O 模型,io_uring 主要為了解決 原生AIO(Native AIO) 存在的一些不足之處。下面介紹一下原生 AIO 的不足之處: 系統(tǒng)調(diào)用開銷大:提交 I/O 操作和獲取 I/O 操作的結(jié)果都需要通過(guò)系統(tǒng)調(diào)用完成,而觸發(fā)系統(tǒng)調(diào)用時(shí),需求進(jìn)行上下文切換。在高 IOPS(Input/Output Per Second)的情況下,進(jìn)行上下文切換也會(huì)消耗大量的CPU時(shí)間。 僅支持 Direct I/O(直接I/O):在使用原生 AIO 的時(shí)候,只能指定 O_DIRECT 標(biāo)識(shí)位(直接 I/O),不能借助文件系統(tǒng)的頁(yè)緩存(page cache)來(lái)緩存當(dāng)前的 I/O 請(qǐng)求。 對(duì)數(shù)據(jù)有大小對(duì)齊限制:所有寫操作的數(shù)據(jù)大小必須是文件系統(tǒng)塊大?。ㄒ话銥?KB)的倍數(shù),而且要與內(nèi)存頁(yè)大小對(duì)齊。 數(shù)據(jù)拷貝開銷大:每個(gè) I/O 提交需要拷貝 64+8 字節(jié),每個(gè) I/O 完成結(jié)果需要拷貝 32 字節(jié),總共 104 字節(jié)的拷貝。這個(gè)拷貝開銷是否可以承受,和單次 I/O 大小有關(guān):如果需要發(fā)送的 I/O 本身就很大,相較之下,這點(diǎn)消耗可以忽略。而在大量小 I/O 的場(chǎng)景下,這樣的拷貝影響比較大。 對(duì)應(yīng)io_uring就有他的優(yōu)勢(shì): 減少系統(tǒng)調(diào)用:io_uring通過(guò)內(nèi)核態(tài)和用戶態(tài)共享內(nèi)存的方式進(jìn)行通信。如下圖: 用戶態(tài)和內(nèi)核態(tài)之間通過(guò)共享內(nèi)存維護(hù)了三部分:提交隊(duì)列、完成隊(duì)列、提交隊(duì)列表項(xiàng)數(shù)組。 提交隊(duì)列是一整塊連續(xù)的內(nèi)存空間存儲(chǔ)的環(huán)形隊(duì)列,用于存放將要執(zhí)行I/O操作的數(shù)據(jù)。 完成隊(duì)列也是一整塊連續(xù)的內(nèi)存空間存儲(chǔ)的環(huán)形隊(duì)列,其中存放了I/O操作完成返回的結(jié)果。 提交隊(duì)列表項(xiàng)數(shù)組是以數(shù)組形式將要執(zhí)行的I/O操作組織在一起,提交隊(duì)列完成隊(duì)列分別通過(guò)指針指向?qū)?yīng)表項(xiàng)(內(nèi)部通過(guò)偏移量實(shí)現(xiàn))完成對(duì)應(yīng)操作。如下圖所示: 提交隊(duì)列具體實(shí)現(xiàn)為:io_uring_sq結(jié)構(gòu)體。 struct io_uring_sq { unsigned *khead; unsigned *ktail; unsigned *kflags; unsigned *kdropped; unsigned *array; struct io_uring_sqe *sqes; unsigned sqe_head; unsigned sqe_tail; size_t ring_sz; void *ring_ptr; unsigned ring_mask; unsigned ring_entries; unsigned pad[2]; }; AI生成項(xiàng)目 cpp 運(yùn)行 提交隊(duì)列通過(guò)struct io_uring_sqe *sqes提交隊(duì)列表項(xiàng)數(shù)組,使用khead、ktail指向?qū)?yīng)隊(duì)頭和對(duì)尾完成應(yīng)用層提交的IO任務(wù)的處理。同理完成隊(duì)列也是如此,不過(guò)是由內(nèi)核態(tài)將對(duì)應(yīng)IO事件執(zhí)行完成后將結(jié)果寫入到對(duì)應(yīng)表項(xiàng)。應(yīng)用層通過(guò)完成隊(duì)列表項(xiàng)即可獲取到最終結(jié)果。 整體流程為: 請(qǐng)求時(shí):1、應(yīng)用創(chuàng)建SQE,更新SQ tail 2、內(nèi)核消費(fèi)SQE,更新SQ head。內(nèi)核開始處理任務(wù),處理完成后:1、內(nèi)核為完成的一個(gè)或多個(gè)請(qǐng)求創(chuàng)建CQE,更新CQ tail 2、應(yīng)用層消費(fèi)CQE,更新CQ head。 二、liburing安裝 上文介紹的io_uring均為內(nèi)核層支持,在內(nèi)核層提供了三個(gè)API: io_uring_setup(2) io_uring_register(2) io_uring_enter(2) 為方便使用直接安裝liburing即可在應(yīng)用層使用io_uring。liburing為作者Axboe封裝好的用戶態(tài)庫(kù)。 實(shí)驗(yàn)環(huán)境:vmware 17安裝ubuntu22.04 、內(nèi)核版本:6.8.0-60-generic 選用源碼安裝方式,安裝liburing。 1、獲取源碼:git clone https://github.com/axboe/liburing.git 2、編譯: cd liburing-master ./configure make -j sudo make install AI生成項(xiàng)目 bash 三、代碼實(shí)現(xiàn) #include <stdio.h> #include <liburing.h> #include <netinet/in.h> #include <string.h> #include <unistd.h> #define BUFFER_LENGTH 1024 #define ENTRIES_LENGTH 1024 #define EVENT_ACCEPT 0 #define EVENT_READ 1 #define EVENT_WRITE 2 // io事件信息 struct req_info{ int fd; int event; }; // 初始化tcp fd int init_server(unsigned int port){ int socket_fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_family = AF_INET; serveraddr.sin_port = htons(port); if(-1 == bind(socket_fd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr))){ perror("bind error!\n"); } listen(socket_fd, 10); return socket_fd; } // 設(shè)置accept事件 int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr* addr, socklen_t *addrlen, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct req_info accept_info = { .fd = sockfd, .event = EVENT_ACCEPT }; memcpy(&sqe->user_data, &accept_info, sizeof(struct req_info)); io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags); return 0; } // 設(shè)置recv事件 int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct req_info recv_info = { .fd = sockfd, .event = EVENT_READ }; memcpy(&sqe->user_data, &recv_info, sizeof(struct req_info)); io_uring_prep_recv(sqe, sockfd, buf, len, flags); return 0; } // 設(shè)置send事件 int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct req_info send_info = { .fd = sockfd, .event = EVENT_WRITE }; memcpy(&sqe->user_data, &send_info, sizeof(struct req_info)); io_uring_prep_send(sqe, sockfd, buf, len, flags); return 0; } int main(int argc, void *argv[]){ unsigned int port = 9999; int sockfd = init_server(port); struct io_uring_params params; memset(¶ms, 0, sizeof(struct io_uring_params)); // 初始化io_uring 其中包含了sq和cq struct io_uring ring; io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); while(1){ char buffer[BUFFER_LENGTH] = {0}; // 事件提交 io_uring_submit(&ring); // 等待事件完成,其他開發(fā)場(chǎng)景下非必要情況無(wú)需等待 struct io_uring_cqe *cqe; io_uring_wait_cqe(&ring, &cqe); // 獲取完成事件 struct io_uring_cqe *cqe_list[128]; int nready = io_uring_peek_batch_cqe(&ring, cqe_list, 128); // echo邏輯處理 for(int i = 0; i < nready; ++i){ struct io_uring_cqe *entries = cqe_list[i]; struct req_info result; memcpy(&result, &entries->user_data, sizeof(struct req_info)); if(result.event == EVENT_ACCEPT){ printf("accept client\n"); set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); int connfd = entries->res; set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); }else if(result.event == EVENT_READ){ int connfd = entries->res; if(connfd == 0){ printf("close fd:%d\n", result.fd); close(result.fd); }else if(connfd > 0){ printf("recv: len:%d, data:%s\n", connfd, buffer); set_event_send(&ring, result.fd, buffer, connfd, 0); }else{ printf("error recv!\n"); close(result.fd); } }else if(result.event == EVENT_WRITE){ int ret = entries->res; printf("set_event_send ret: %d, %s\n", ret, buffer); set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); } } // 清除完成隊(duì)列中完成表項(xiàng),以免事件重復(fù)處理 io_uring_cq_advance(&ring, nready); } return 0; } AI生成項(xiàng)目 cpp 運(yùn)行 |
|
|