內(nèi)核如何從網(wǎng)卡接收數(shù)據(jù),傳統(tǒng)的過程:
1.數(shù)據(jù)到達網(wǎng)卡;
2.網(wǎng)卡產(chǎn)生一個中斷給內(nèi)核;
3.內(nèi)核使用I/O指令,從網(wǎng)卡I/O區(qū)域中去讀取數(shù)據(jù);
我們在許多網(wǎng)卡驅(qū)動中(很老那些),都可以在網(wǎng)卡的中斷函數(shù)中見到這一過程。
但是,這一種方法,有一種重要的問題,就是大流量的數(shù)據(jù)來到,網(wǎng)卡會產(chǎn)生大量的中斷,內(nèi)核在中斷上下文中,會浪費大量的資源來處理中斷本身。所以,就有一個問題,“可不可以不使用中斷”,這就是輪詢技術(shù),所謂NAPI技術(shù),說來也不神秘,就是說,內(nèi)核屏蔽中斷,然后隔一會兒就去問網(wǎng)卡,“你有沒有數(shù)據(jù)???”……
從這個描述本身可以看到,如果數(shù)據(jù)量少,輪詢同樣占用大量的不必要的CPU資源,大家各有所長吧
OK,另一個問題,就是從網(wǎng)卡的I/O區(qū)域,包括I/O寄存器或I/O內(nèi)存中去讀取數(shù)據(jù),這都要CPU去讀,也要占用CPU資源,“CPU從I/O區(qū)域讀,然后把它放到內(nèi)存(這個內(nèi)存指的是系統(tǒng)本身的物理內(nèi)存,跟外設(shè)的內(nèi)存不相干,也叫主內(nèi)存)中”。于是自然地,就想到了DMA技術(shù)——讓網(wǎng)卡直接從主內(nèi)存之間讀寫它們的I/O數(shù)據(jù),CPU,這兒不干你事,自己找樂子去:
1.首先,內(nèi)核在主內(nèi)存中為收發(fā)數(shù)據(jù)建立一個環(huán)形的緩沖隊列(通常叫DMA環(huán)形緩沖區(qū))。
2.內(nèi)核將這個緩沖區(qū)通過DMA映射,把這個隊列交給網(wǎng)卡;
3.網(wǎng)卡收到數(shù)據(jù),就直接放進這個環(huán)形緩沖區(qū)了——也就是直接放進主內(nèi)存了;然后,向系統(tǒng)產(chǎn)生一個中斷;
4.內(nèi)核收到這個中斷,就取消DMA映射,這樣,內(nèi)核就直接從主內(nèi)存中讀取數(shù)據(jù);
——呵呵,這一個過程比傳統(tǒng)的過程少了不少工作,因為設(shè)備直接把數(shù)據(jù)放進了主內(nèi)存,不需要CPU的干預(yù),效率是不是提高不少?
對應(yīng)以上4步,來看它的具體實現(xiàn):
1)分配環(huán)形DMA緩沖區(qū)
Linux內(nèi)核中,用skb來描述一個緩存,所謂分配,就是建立一定數(shù)量的skb,然后用e1000_rx_ring 環(huán)形緩沖區(qū)隊列描述符連接起來
2)建立DMA映射
內(nèi)核通過調(diào)用
dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction)
建立映射關(guān)系。
struct device *dev 描述一個設(shè)備;
buffer:把哪個地址映射給設(shè)備;也就是某一個skb——要映射全部,當然是做一個雙向鏈表的循環(huán)即可;
size:緩存大?。?br>direction:映射方向——誰傳給誰:一般來說,是“雙向”映射,數(shù)據(jù)在設(shè)備和內(nèi)存之間雙向流動;
對于PCI設(shè)備而言(網(wǎng)卡一般是PCI的),通過另一個包裹函數(shù)pci_map_single,這樣,就把buffer交給設(shè)備了!設(shè)備可以直接從里邊讀/取數(shù)據(jù)。
3)這一步由硬件完成;
4)取消映射
dma_unmap_single,對PCI而言,大多調(diào)用它的包裹函數(shù)pci_unmap_single,不取消的話,緩存控制權(quán)還在設(shè)備手里,要調(diào)用它,把主動權(quán)掌握在CPU手里——因為我們已經(jīng)接收到數(shù)據(jù)了,應(yīng)該由CPU把數(shù)據(jù)交給上層網(wǎng)絡(luò)棧;當然,不取消之前,通常要讀一些狀態(tài)位信息,諸如此類,一般是調(diào)用dma_sync_single_for_cpu()讓CPU在取消映射前,就可以訪問DMA緩沖區(qū)中的內(nèi)容。
原代碼分析
基于linux v2.6.26
//e1000_probe 網(wǎng)卡初始化 (重點關(guān)注兩部分 1注冊poll函數(shù) 2設(shè)置接收緩沖的大小)
static int __devinit e1000_probe(struct pci_dev *pdev,const struct pci_device_id *ent){
struct net_device *netdev;
struct e1000_adapter *adapter;
....
err=pci_enable_device(pdev);
...
err=pci_set_dma_mask(pdev,DMA_64BIT_MASK); //設(shè)置pci設(shè)備的dma掩碼
...
netdev = alloc_etherdev(sizeof(struct e1000_adapter)); //為e1000網(wǎng)卡對應(yīng)的net_device結(jié)構(gòu)分配內(nèi)存
...
pci_set_drvdata(pdev,netdev);
adapter=netdev_priv(netdev);
adapter->netdev=netdev;
adapter->pdev=pdev;
...
mmio_start = pci_resource_start(pdev,0);
mmio_len = pci_resource_len(pdev,0);
....
/*將e1000網(wǎng)卡驅(qū)動的相應(yīng)函數(shù)注冊到net_device中*/
netdev->open = &e1000_open;
netdev->stop = &e1000_close;
...
netif_napi_add(netdev,&adapter->napi,e1000_clean,64); // 注冊poll函數(shù)為e1000_clean, weight為64
...
netdev->mem_start = mmio_start;
netdev->mem_end = mmio_start+mmio_len;
....
if(e1000e_read_mac_addr(&adapter->hw)) ndev_err(...); //從網(wǎng)卡設(shè)備的EEPROM中讀取mac地址
memcpy(netdev->dev_addr, adapter->hw.mac.addr, netdev->addr_len);
memcpy(netdev->perm_addr, adapter->hw.mac.addr, netdev->addr_len);
....
adapter->rx_ring->count = 256; //設(shè)置接收環(huán)型緩沖區(qū)隊列的缺省大小
...
e1000_reset(adapter);
...
strcpy(netdev->name,"eth%d");
err= register_netdev(netdev); //將當前網(wǎng)絡(luò)設(shè)備注冊到系統(tǒng)的dev_base[]設(shè)備數(shù)組當中
....
return 0;
}
e1000_open 各種數(shù)據(jù)結(jié)構(gòu)初始化 (環(huán)形緩沖區(qū)隊列的初始化)
static int e1000_open(struct net_device *netdev){
struct e1000_adapter *adapter = netdev_priv(netdev);
....
err = e1000_setup_all_rx_resoures(adapter) //預(yù)先分配緩沖區(qū)資源
....
err = e1000_request_irq(adapter); //分配irq中斷
....
}
int e1000_setup_all_rx_resources(struct e1000_adapter *adapter){
int i,err=0;
for(i=0 ; i<adapter->num_rx_queues ; i++){
err = e1000_setup_rx_resources(adapter,&adapter->rx_ring[i]);
if(err){
...
}
}
return err;
}
e1000_rx_ring 環(huán)形緩沖區(qū)隊列(接收緩沖隊列由多個描述符組成,每個描述符中都包含一個緩沖區(qū)buffer,該buffer以dma方式存放數(shù)據(jù)包,整個緩沖隊列以環(huán)形排列 每個描述符都有一個狀態(tài)變量以表示該緩沖區(qū)buffer是否可以被新到的數(shù)據(jù)包覆蓋)
struct e1000_rx_ring{
void *desc; //指向該環(huán)形緩沖區(qū)
dma_addr_t dma; //dma物理地址
unsigned int size;
unsigned int count; //環(huán)形隊列由多少個描述符組成,這個在probe中定義了
unsigned int next_to_use; //下一個可使用的描述符號
unsigned int next_to_clean; //該描述符狀態(tài)(是否正在使用,是否臟)
struct e1000_buffer *buffer_info; //緩沖區(qū)buffer
...
}
struct e1000_buffer{
struct sk_buff *skb;
....
}
static int e1000_setup_rx_resources(struct e1000_adapt *adapter, struct e1000_rx_ring *rxdr){
struct pci_dev *pdev = adapter->pdev;
int size,desc_len;
size = sizeof(struct e1000_buffer) * rxdr->count;
rxdr->buffer_info = vmalloc(size);
memset(rxdr->buffer_info,0,size); //分配buffer所使用的內(nèi)存
....
if(adapter->hw.mac_type <= e1000_82547_rec_2)
desc_len = sizeof(struct e1000_rx_desc);
else ....
rxdr->size = rxdr->count * desc_len;
rxdr->size = ALIGN(rxdr->size,4096);
rxdr->desc = pci_alloc_consistent(pdev,rxdr->size,&rxdr->dma);
...
memset(rxdr->desc,0,rxdr->size); //分配緩沖隊列所使用的內(nèi)存
rxdr->next_to_clean =0;
rxdr->next_to_use =0;
return 0;
}
e1000_up 啟動網(wǎng)卡函數(shù) 調(diào)用alloc_rx_buf來建立環(huán)形緩沖隊列
int e1000_up(struct e1000_adapter *adapter){
e1000_configure(adatper);
....
}
static void e1000_configure(struct e1000_adapter *adapter){
struct net_device *netdev = adapter->netdev;
int i;
...
e1000_configure_rx(adapter);
...
for (i=0;i<adapter->num_rx_queues;i++){
struct e1000_rx_ring *ring = &adapter ->rx_ring[i];
adapter->alloc_rx_buf(adapter,ring,E1000_DESC_UNUSED(ring)); //從這里就可以看出 環(huán)形緩沖區(qū)并不是一開始就完全建好的,建了部分
}
...
}
static void e1000_configure_rx(struct e1000_adapter *adapter){
....
adapter->clean_rx = e1000_clean_rx_irq; //后面會提到的poll()
adapter->alloc_rx_buf = e1000_alloc_rx_irq //建立環(huán)形緩沖隊列函數(shù) 這里實際調(diào)用的是e1000_alloc_rx_buffers
}
e1000_alloc_rx_buffers ----因為其中有些參數(shù)要看完下面的才能理解,所以這個函數(shù)最后再寫
e1000_intr e1000的中斷處理函數(shù)
static irqreturn_t e1000_intr(int irq,void *data){
struct net_device *netdev = data;
struct e1000_adapter *adapter = netdev_priv(netdev);
..
u32 icr = E1000_READ_REG(hw,ICR);
#ifdef CONFIG_E1000_NAPI
int i;
#endif
...
#ifdef CONFIG_E1000_NAPI //進入輪詢模式
if(unlikely(hw->mac_type<e1000_82571)){
E1000_WRITE_REG(hw,IMC,~0); //關(guān)閉中斷
E1000_WRITE_FLUSH(hw);
}
if (likely(netif_rx_schedule_prep(netdev,&adapter->napi))){ //確定該設(shè)備處于運行狀態(tài), 而且還未被添加到網(wǎng)絡(luò)層的poll隊列中
...
__netif_rx_schedule(netdev,&adapter->napi); //將當前設(shè)備netdevice加到與cpu相關(guān)的softnet_data的輪旬設(shè)備列表poll_list中并觸發(fā)NET_RX_SOFTIRQ軟中斷
}
#else //進入中斷模式
{ ...
for(i=0;i<E1000_MAX_INTR;i++){
if (unlikely(!adapter->clean_rx(adapter, adapter->rx_ring) &.... //執(zhí)行clean_rx()中關(guān)于中斷模式的代碼 不走napi路徑
break;
...
}
}
....
return IRQ_HANDLED;
}
static inline int netif_rx_schedule_prep(struct net_device *dev,struct napi_struct *napi){
return napi_schedule_prep(napi);
}
static inline int napi_schedule_prep(struct napi_struct *n){
return !napi_disable_pending(n) &&
!test_and_set_bit(NAPI_STATE_SCHED, &n->state); //測試該設(shè)備是否已被被加到poll隊列
}
static inline int napi_disable_pending (struct napi_struct *n){
return test_bit(NAPI_STATE_DISABLE,&n->state); //測試該設(shè)備是否停止運行
}
static inline void __netif_rx_schedule(struct net_device *dev,struct napi_struct *napi){
__napi_schedule(napi);
}
void __napi_schedule(struct napi_struct *n){
unsigned long flags;
local_irq_save(flags);
list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ); 觸發(fā)軟中斷
local_irq_restore(flags);
}
#define __raise_softirq_irqoff(nr) do {or_softirq_pending(iUL<<(nr)); } while(0)
用到的數(shù)據(jù)結(jié)構(gòu)napi_struct
struct napi_struct{
struct list_head poll_list; //poll_list鏈表
unsigned long state //設(shè)備狀態(tài)信息 往上看看
int weight; //設(shè)備預(yù)處理數(shù)據(jù)配額,作用是輪詢時更公平的對待各個設(shè)備
int (*poll) (strcut napi_struct *,int);
.....
}
接下來就是軟中斷處理函數(shù)net_rx_action()
static void net_rx_action(struct softirq_action *h){
struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
unsigned long start_time = jiffies;
int budget = netdev_budget; //處理限額,一次最多只能處理這么多數(shù)據(jù)包
....
local_irq_disable();
while (!list_empty(list)){
struct napi_struct *n;
int work,
weight;
if (unlikely(budget < 0 || jiffies != start_time)) //如果實際工作量work 超過限額,或處理時間超過1秒,則出于系統(tǒng)響應(yīng)考慮立即從軟中斷處理函數(shù)中跳出來, work是poll的返回值 限額budget每次都會根據(jù)返回的work值重新計算 ,配額weight和work配合來實現(xiàn)輪詢算法,具體算法要看完e1000_clean(),e1000_rx_irq()才能清楚
goto softnet_break;
local_irq_enalbe();
n = list_entry(list->next,struct napi_struct,poll_list);
weight = n->weight;
work 0;
if (test_bit(NAPI_STATE_SCHED,&n->state))
work = n->poll(n,weight); //調(diào)用設(shè)備的poll函數(shù)e1000_clean()
....
budget -= work; //更新限額
local_irq_disable();
if (unlikely(work == weight)){ //處理量大于配額
if(unlikely(napi_disable_pending(n)))
__napi_complete(n);
else
list_move_tail(&n->poll_list,list); //該設(shè)備還有要接收的數(shù)據(jù)沒被處理,因為輪詢算法 被移動到poll_llst尾部等待處理
}
...
}
out:
local_irq_enable();
...
return;
softnet_break:
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}
e1000網(wǎng)卡poll函數(shù) e1000_clean()
static int e1000_clean(struct napi_struct *napi,int budget){ //此處的budget實際上是傳過來的weight,不要和上面的budget弄混了
struct e1000_adapter *adapter = container_of(napi,struct e1000_adapter,napi);
struct net_device *poll_dev = adapter->netdev;
int work_done = 0;
adapter = poll_dev->priv;
.....
adapter ->clean_rx(adapter,&adapter ->rx_ring[0],&work_done,budget); //實際調(diào)用的是clean_rx_irq()
...
if(work_done<budget){ //如果完成的工作量(已處理了的接收數(shù)據(jù))小于weight(budget=weight), 則說明處理完成了該設(shè)備所有要接收的數(shù)據(jù)包, 之后調(diào)用netif_rx_complete()將該設(shè)備從poll_list鏈表中刪除,并打開中斷退出輪詢模式
...
netif_rx_complete(poll_dev,napi); //__napi_complete()的包裝函數(shù)
e1000_irq_enable(adapter); //開中斷
}
return work_done;
}
static inline void netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
unsigned long flags;
local_irq_save(flags);
__netif_rx_complete(dev,napi);
local_irq_restore(flags);
}
static inline void __netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
__napi_complete(napi);
}
static inline void __napi_complete(struct napi_struct *n){
....
list_del(&n->poll_list);
clear_bit(NAPI_STATE_SCHED,&n->state);
}
設(shè)備輪詢接收機制中最重要的函數(shù)e1000_clean_rx_irq()
#ifdef CONFIG_E1000_NAPI
e1000_clean_rx_irq(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int *work_done,int work_to_do) //work_to_do實際上是傳過來的配額weight
.....
{
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc,*next_rxd;
struct e1000_buffer *buffer_info, *next_buffer;
...
unsigned int i;
int cleaned_count = 0;
....
i = rx_ring->next_to_clean; //next_to_clean是下一個可以被清除的描述符索引,上面講過環(huán)形緩沖隊列由多個描述符組成,每個描述符都有一個用于存放接收數(shù)據(jù)包的緩沖區(qū)buffer,這里所說的“可以被清除”并不是將其刪除,而是標記這個緩沖區(qū)的數(shù)據(jù)已經(jīng)處理(可能正在處理),但是否處理完了要看rx_desc->status&E1000_RXD_STAT_DD,當有新數(shù)據(jù)需要使用緩沖區(qū)時,只是將已處理的緩沖區(qū)覆蓋而已, 這里的i可以理解為可以被新數(shù)據(jù)覆蓋的緩沖區(qū)序號
rx_desc = E1000_RX_DESC(*rx_ring,i); //得到相應(yīng)的描述符
buffer_info = &rx_ring->buffer_info[i];
while(rx_desc->status & E1000_RXD_STAT_DD){ //測試其狀態(tài)是否為已刪除
struct sk_buff *skb;
u8 status;
#ifdef CONFIG_E1000_NAPI
if (*wrok_done>=work_to_do) //如果所完成的工作>配額則直接退出
break;
(*work_done) ++
#endif
status = rx_desc->status;
skb = buffer_info->skb; //得到緩沖區(qū)中的數(shù)據(jù)
buffer_info->skb = NULL;
prefetch(skb->data-NET_IP_ALIGN);
if(++i == rx_ring->count) //處理環(huán)形緩沖區(qū)達到隊列末尾的情況,因為是環(huán)形的,所以到達末尾的下一個就是隊列頭,這樣整個隊列就不斷地循環(huán)處理。然后獲取下一格描述符的狀態(tài),看看是不是處理刪除狀態(tài)。如果處于就會將新到達的數(shù)據(jù)覆蓋舊的緩沖區(qū),如果不處于則跳出循環(huán),并將當前緩沖區(qū)索引號置為下一次查詢的目標
i = 0;
next_rxd = E1000_RX_DESC(*rx_ring,i);
next_buffer = &rx_ring->buffer_info[i];
cleaned = true ;
cleaned_count ++;
pci_unmap_single(pdev,buffer_info->dma,buffer_info->length,PCI_DMA_FROMDEVICE); //* 取消映射,因為通過DMA,網(wǎng)卡已經(jīng)把數(shù)據(jù)放在了主內(nèi)存中,這里一取消,也就意味著,CPU可以處理主內(nèi)存中的數(shù)據(jù)了 */
....
//checksum
...
#ifdef CONFIG_E1000_NAPI
netif_receive_skb(skb); //交由上層協(xié)議處理 , 如果數(shù)據(jù)包比較大,處理時間會相對較長
#else
netif_rx(skb); //進入中斷模式 將數(shù)據(jù)包插入接收隊列中,等待軟中斷處理 中斷模式不用環(huán)形接收緩沖隊列
#endif
netdev->last_rx = jiffies;
next_desc:
rx_desc->status =0;
if(unlikely(cleaned_count >= E1000_RX_BUFFER_WRITE)){
adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count); //在e1000_up中已經(jīng)調(diào)用了這個函數(shù)為環(huán)形緩沖區(qū)隊列中的每個緩沖區(qū)分配了sk_buff內(nèi)存,但是如果接收到數(shù)據(jù)以后,調(diào)用netif_receive_skb(skb)向上提交數(shù)據(jù)以后,這段內(nèi)存將始終被這個skb占用(直到上層處理完以后才會調(diào)用_kfree_skb釋放),換句話說,就是當前緩沖區(qū)必須重新申請分配sk_buff內(nèi)存,為下一個數(shù)據(jù)作準備
cleaned_count = 0;
}
rx_desc = next_rxd;
buffer_info = next_buffer;
}
rx_ring->next_to_clean = i;
cleaned_count = E1000_DESC_UNUSED(rx_ring);
if(cleaned_count)
adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count);
...
return cleaned;
}
static void e1000_alloc_rx_buffers(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int cleaned_count){
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc;
struct e1000_buffer *buffer_info;
struct sk_buff *skb;
unsigned int i;
unsigned int bufsz = adapter->rx_buffer_len+NET_IP_ALIGN;
i=rx_ring->next_to_use;
buffer_info = &rx_ring->buffer_info[i];
while (cleaned_count--){
skb = buffer_info ->skb;
if(skb){
....
}
skb = netdev_alloc_skb(netdev,bufsz); //skb緩存的分配
if(unlikely(!skb)){
adapter->alloc_rx_buff_failed++;
break;
}
skb_reserve(skb,NET_IP_ALIGN);
buffer_info->skb = skb;
buffer_info->length = adapter ->rx_buffer_len;
map_skb:
buffer_info->dma = pci_map_single(pdev,skb->data, adapter->rx_buffer_len,PCI_DMA_FROMDEVICE); //建立DMA映射,把每一個緩沖區(qū)skb->data都映射給了設(shè)備,緩存區(qū)描述符利用dma保存了每一次映射的地址
....
rx_desc = E1000_RX_DESC(*rx_ring, i);
rx_desc->buffer_addr = cpu_to_le64(buffer_info->dma);
if (unlikely(++i == rx_ring->count)) //達到環(huán)形緩沖區(qū)末尾
i =0 ;
buffer_info = &rx_ring->buffer_info[i];
}
if(likely(rx_ring->netx_to_use!=i)){
rx_ring->next_to_use = i;
if (unlikely(i-- == 0))
i = (rx_ring->count - 1);
...
}
}
簡要流程