NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 POLL 的方法来轮询数据。随着网络的接收速度的增加,NIC 触发的中断能做到不断减少,目前 NAPI 技术已经在网卡驱动层和网络层得到了广泛的应用,驱动层次上已经有 E1000 系列网卡,RTL8139 系列网卡,3c50X 系列等主流的网络适配器都采用了这个技术,而在网络层次上,NAPI 技术已经完全被应用到了著名的netif_rx 函数中间,并且提供了专门的 POLL 方法–process_backlog 来处理轮询的方法;根据实验数据表明采用NAPI技术可以大大改善短长度数据包接收的效率,减少中断触发的时间。
这里重点要分清硬中断和软中断区别。上半部分下半部分就是说的:
接收时硬中断过程到软中断过程。
发送时用户进程陷入内核到软中断发送。
非NAPI方式 旧版本可以参考道系统方式,新版本netif_rx已集成NAPI技术了。每个数据包到来,都会产生硬件中断,中断处理程序netif_rx->enqueue_to_backlog将收到的包放入当前cpu的收包队列softnet_data->input_pkt_queue中,并且通过enqueue_to_backlog->____napi_schedule将非napi设备对应的虚拟设备napi结构softnet->backlog结构挂在当前cpu的待收包设备链表softnet_data->poll_list中,并触发软中断,软中断处理过程中,所以会调用默认初始化backlog的回调处理函数process_backlog,如果NAPI则是自己注册的。将收包队列input_pkt_queue合并到softdata->process_queue后面,并依次处理该队列中的数据包
NAPI方式 数据包到来,第一个数据包产生硬件中断,中断处理程序将设备的napi_struct结构挂在当前cpu的待收包设备链表softnet_data->poll_list中,并触发软中断,软中断执行过程中,遍历softnet_data->poll_list中的所有设备,依次调用其收包函数napi_sturct->poll,处理收包过程;
中断上半部
e100_intr(中断处理程序)–>__napi_schedule–>____napi_schedule(将设备对应的napi结构加入到当前cpu的待收包处理队列softnet_data->poll_list中,并触发软中断)
数据包到来,第一包产生中断,中断处理程序得到执行,其中关键步骤为调用__napi_schedule(&nic->napi)将设备对应的napi加入到当前cpu的softnet_data->poll_list中;
中断下半部
net_rx_action(软中断收包处理程序)–>napi_poll(执行设备包处理回调napi_struct->poll)
但是 NAPI 存在一些比较严重的缺陷:
对于上层的应用程序而言,系统不能在每个数据包接收到的时候都可以及时地去处理它,而且随 着传输速度增加,累计的数据包将会耗费大量的内存,经过实验表明在 Linux 平台上这个问题会比 在 FreeBSD 上要严重一些;
另外一个问题是对于大的数据包处理比较困难,原因是大的数据包传送到网络层上的时候耗费的 时间比短数据包长很多(即使是采用 DMA 方式),所以正如前面所说的那样,NAPI 技术适用于对 高速率的短长度数据包的处理。
使用 NAPI 先决条件:
驱动可以继续使用老的 2.4 内核的网络驱动程序接口,NAPI 的加入并不会导致向前兼容性的丧失,但是 NAPI 的使用至少要得到下面的保证:
要使用 DMA 的环形输入队列(也就是 ring_dma,这个在 2.4 驱动中关于 Ethernet 的部 分有详细的介绍),或者是有足够的内存空间缓存驱动获得的包。
在发送/接收数据包产生中断的时候有能力关断 NIC 中断的事件处理,并且在关断 NIC 以后,并不影响数据包接收到网络设备的环形缓冲区(以下简称 rx-ring)处理队列中。NAPI 对数据包到达的事件的处理采用轮询方法,在数据包达到的时候,NAPI 就会强制执行dev->poll方法。而和不像以前的驱动那样为了减少包到达时间的处理延迟,通常采用中断的方法来 进行。
E1000网卡驱动程序对NAPI的支持:上面已经介绍过了,使用NAPI需要在编译内核的时候选择打开相应网卡设备的NAPI支持选项,对于E1000网卡来说就是CONFIG_E1000_NAPI宏。E1000网卡的初始化函数,也就是通常所说的probe方法,定义为e1000_probe():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 static int __devinit e1000_probe (struct pci_dev *pdev, const struct pci_device_id *ent) { struct net_device *netdev ;struct e1000_adapter *adapter ;static int cards_found = 0 ;unsigned long mmio_start;int mmio_len;int pci_using_dac;int i;int err;uint16_t eeprom_data;if ((err = pci_enable_device(pdev))) return err; if (!(err = pci_set_dma_mask(pdev, PCI_DMA_64BIT))) { pci_using_dac = 1 ; } else { if ((err = pci_set_dma_mask(pdev, PCI_DMA_32BIT))) { E1000_ERR("No usable DMA configuration, aborting\n" ); return err; } pci_using_dac = 0 ; } if ((err = pci_request_regions(pdev, e1000_driver_name))) return err; pci_set_master(pdev); netdev = alloc_etherdev(sizeof (struct e1000_adapter)); if (!netdev) { err = -ENOMEM; goto err_alloc_etherdev; } SET_MODULE_OWNER(netdev); pci_set_drvdata(pdev, netdev); adapter = netdev->priv; adapter->netdev = netdev; adapter->pdev = pdev; adapter->hw.back = adapter; mmio_start = pci_resource_start(pdev, BAR_0); mmio_len = pci_resource_len(pdev, BAR_0); adapter->hw.hw_addr = ioremap(mmio_start, mmio_len); if (!adapter->hw.hw_addr) { err = -EIO; goto err_ioremap; } for (i = BAR_1; i <= BAR_5; i++) { if (pci_resource_len(pdev, i) == 0 ) continue ; if (pci_resource_flags(pdev, i) & IORESOURCE_IO) { adapter->hw.io_base = pci_resource_start(pdev, i); break ; } } netdev->open = &e1000_open; netdev->stop = &e1000_close; netdev->hard_start_xmit = &e1000_xmit_frame; netdev->get_stats = &e1000_get_stats; netdev->set_multicast_list = &e1000_set_multi; netdev->set_mac_address = &e1000_set_mac; netdev->change_mtu = &e1000_change_mtu; netdev->do_ioctl = &e1000_ioctl; netdev->tx_timeout = &e1000_tx_timeout; netdev->watchdog_timeo = 5 * HZ; #ifdef CONFIG_E1000_NAPI netdev->poll = &e1000_clean; netdev->weight = 64 ; #endif netdev->vlan_rx_register = e1000_vlan_rx_register; netdev->vlan_rx_add_vid = e1000_vlan_rx_add_vid; netdev->vlan_rx_kill_vid = e1000_vlan_rx_kill_vid; netdev->irq = pdev->irq; netdev->mem_start = mmio_start; netdev->mem_end = mmio_start + mmio_len; netdev->base_addr = adapter->hw.io_base; adapter->bd_number = cards_found; if (pci_using_dac)netdev->features |= NETIF_F_HIGHDMA; e1000_read_mac_addr(&adapter->hw); memcpy (netdev->dev_addr, adapter->hw.mac_addr, netdev->addr_len);if (!is_valid_ether_addr(netdev->dev_addr)) {err = -EIO; goto err_eeprom;} init_timer(&adapter->tx_fifo_stall_timer); adapter->tx_fifo_stall_timer.function = &e1000_82547_tx_fifo_stall; adapter->tx_fifo_stall_timer.data = (unsigned long ) adapter; init_timer(&adapter->watchdog_timer); adapter->watchdog_timer.function = &e1000_watchdog; adapter->watchdog_timer.data = (unsigned long ) adapter; init_timer(&adapter->phy_info_timer); adapter->phy_info_timer.function = &e1000_update_phy_info; adapter->phy_info_timer.data = (unsigned long ) adapter; INIT_TQUEUE(&adapter->tx_timeout_task, (void (*)(void *))e1000_tx_timeout_task, netdev); register_netdev(netdev); netif_carrier_off(netdev); netif_stop_queue(netdev); e1000_check_options(adapter); }
在分析网卡接收数据包的过程中,设备的open方法是值得注意的,因为在这里对网卡设备的各种数据 结构进行了初始化,特别是环形缓冲区队列。E1000网卡驱动程序的open方法注册为e1000_open():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static int e1000_open (struct net_device *netdev) { struct e1000_adapter *adapter = netdev->priv;int err;if ((err = e1000_setup_tx_resources(adapter))) goto err_setup_tx; if ((err = e1000_setup_rx_resources(adapter))) goto err_setup_rx; if ((err = e1000_up(adapter))) goto err_up; }
事实上e1000_open() 函数调用了e1000_setup_rx_resources()函数为其环形缓冲区分配资源。 e1000设备的接收方式是一种缓冲方式,能显著的降低 CPU接收数据造成的花费,接收数据之前,软件需要预 先分配一个 DMA 缓冲区,一般对于传输而言,缓冲区最大为 8Kbyte 并且把物理地址链接在描述符的 DMA 地址描述单元,另外还有两个双字的单元表示对应的 DMA 缓冲区的接收状态。
在 /driver/net/e1000/e1000/e1000.h 中对于环形缓冲队列描述符的数据单元如下表示:
1 2 3 4 5 6 7 8 9 10 struct e1000_desc_ring {void *desc; dma_addr_t dma; unsigned int size; unsigned int count; unsigned int next_to_use; unsigned int next_to_clean; struct e1000_buffer *buffer_info ; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static int e1000_setup_rx_resources (struct e1000_adapter *adapter) { struct e1000_desc_ring *rxdr = &adapter->rx_ring;struct pci_dev *pdev = adapter->pdev;int size;size = sizeof (struct e1000_buffer) * rxdr->count; rxdr->buffer_info = kmalloc(size, GFP_KERNEL); if (!rxdr->buffer_info) { return -ENOMEM; } memset (rxdr->buffer_info, 0 , size);rxdr->size = rxdr->count * sizeof (struct e1000_rx_desc); E1000_ROUNDUP(rxdr->size, 4096 ); rxdr->desc = pci_alloc_consistent(pdev, rxdr->size, &rxdr->dma); if (!rxdr->desc) { kfree(rxdr->buffer_info); return -ENOMEM; } memset (rxdr->desc, 0 , rxdr->size);rxdr->next_to_clean = 0 ; rxdr->next_to_use = 0 ; return 0 ;}
在e1000_up()函数中,调用request_irq()向系统申请irq中断号,然后将e1000_intr()中断处理 函数注册到系统当中,系统有一个中断向量表irq_desc[]。然后使能网卡的中断。
接 下来就是网卡处于响应中断的模式,这里重要的函数是 e1000_intr()中断处理函数,关于这个 函数的说明在内核网络设备操作笔记当中,这里就不重复了,但是重点强调的是中断处理函数中对 NAPI部分 的处理方法,因此还是将该函数的源码列出,不过省略了与NAPI无关的处理过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static irqreturn_t e1000_intr (int irq, void *data, struct pt_regs *regs) { struct net_device *netdev = data;struct e1000_adapter *adapter = netdev->priv;uint32_t icr = E1000_READ_REG(&adapter->hw, ICR);#ifndef CONFIG_E1000_NAPI unsigned int i;#endif if (!icr) return IRQ_NONE; #ifdef CONFIG_E1000_NAPI if (netif_rx_schedule_prep(netdev)) {atomic_inc (&adapter->irq_sem);E1000_WRITE_REG(&adapter->hw, IMC, ~0 ); __netif_rx_schedule(netdev); } #else for (i = 0 ; i < E1000_MAX_INTR; i++) if (!e1000_clean_rx_irq(adapter) &!e1000_clean_tx_irq(adapter)) break ; #endif return IRQ_HANDLED;}
下面介绍一下__netif_rx_schedule(netdev)函数的作用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static inline void __netif_rx_schedule(struct net_device *dev){ unsigned long flags;int cpu = smp_processor_id();local_irq_save(flags); dev_hold(dev); list_add_tail(&dev->poll_list, &softnet_data[cpu].poll_list); if (dev->quota < 0 ) dev->quota += dev->weight; else dev->quota = dev->weight; __cpu_raise_softirq(cpu, NET_RX_SOFTIRQ); local_irq_restore(flags); }
在内核网络设备操作阅读笔记当中已经介绍过net_rx_action()这个重要的网络接收软中断处理函数了,
不过这里为了清楚的分析轮询机制,需要再次分析这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static void net_rx_action (struct softirq_action *h) { int this_cpu = smp_processor_id();struct softnet_data *queue = &softnet_data[this_cpu];unsigned long start_time = jiffies;int budget = netdev_max_backlog;br_read_lock(BR_NETPROTO_LOCK); local_irq_disable(); while (!list_empty(&queue ->poll_list)) {struct net_device *dev ;if (budget <= 0 || jiffies - start_time > 1 ) goto softnet_break; local_irq_enable(); dev = list_entry(queue ->poll_list.next, struct net_device, poll_list); if (dev->quota <= 0 || dev->poll(dev, &budget)) { local_irq_disable(); list_del(&dev->poll_list); list_add_tail(&dev->poll_list, &queue ->poll_list); if (dev->quota < 0 ) dev->quota += dev->weight; else dev->quota = dev->weight; } else { dev_put(dev); local_irq_disable(); } } local_irq_enable(); br_read_unlock(BR_NETPROTO_LOCK); return ;}
下面介绍一下e1000网卡的轮询poll处理函数e1000_clean(),这个函数只有定义了NAPI宏的情况下才有效:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #ifdef CONFIG_E1000_NAPI static int e1000_clean (struct net_device *netdev, int *budget) { struct e1000_adapter *adapter = netdev->priv;int work_to_do = min(*budget, netdev->quota);int work_done = 0 ;e1000_clean_tx_irq(adapter); e1000_clean_rx_irq(adapter, &work_done, work_to_do); *budget -= work_done; netdev->quota -= work_done; if (work_done < work_to_do) { netif_rx_complete(netdev); e1000_irq_enable(adapter); } return (work_done >= work_to_do); }
设备轮询接收机制中最重要的函数就是下面这个函数,当然它同时也可以为中断接收机制所用,只不过处理过 程有一定的差别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 static boolean_t #ifdef CONFIG_E1000_NAPI e1000_clean_rx_irq(struct e1000_adapter *adapter, int *work_done, int work_to_do)#else e1000_clean_rx_irq(struct e1000_adapter *adapter) #endif { struct e1000_desc_ring *rx_ring = &adapter->rx_ring;struct net_device *netdev = adapter->netdev;struct pci_dev *pdev = adapter->pdev;struct e1000_rx_desc *rx_desc ;struct e1000_buffer *buffer_info ;struct sk_buff *skb ;unsigned long flags;uint32_t length;uint8_t last_byte;unsigned int i;boolean_t cleaned = FALSE;i = rx_ring->next_to_clean; rx_desc = E1000_RX_DESC(*rx_ring, i); while (rx_desc->status & E1000_RXD_STAT_DD) { buffer_info = &rx_ring->buffer_info[i]; #ifdef CONFIG_E1000_NAPI if (*work_done >= work_to_do) break ; (*work_done)++; #endif cleaned = TRUE; pci_unmap_single(pdev, buffer_info->dma, buffer_info->length, PCI_DMA_FROMDEVICE); skb = buffer_info->skb; length = le16_to_cpu(rx_desc->length); skb_put(skb, length - ETHERNET_FCS_SIZE); e1000_rx_checksum(adapter, rx_desc, skb); skb->protocol = eth_type_trans(skb, netdev); #ifdef CONFIG_E1000_NAPI netif_receive_skb(skb); #else netif_rx(skb); #endif netdev->last_rx = jiffies; rx_desc->status = 0 ; buffer_info->skb = NULL ; if (++i == rx_ring->count) i = 0 ;rx_desc = E1000_RX_DESC(*rx_ring, i); } rx_ring->next_to_clean = i; e1000_alloc_rx_buffers(adapter); return cleaned;}
下面分析的这个函数有助于我们了解环形接收缓冲区的结构和工作原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 static void e1000_alloc_rx_buffers (struct e1000_adapter *adapter) { struct e1000_desc_ring *rx_ring = &adapter->rx_ring;struct net_device *netdev = adapter->netdev;struct pci_dev *pdev = adapter->pdev;struct e1000_rx_desc *rx_desc ;struct e1000_buffer *buffer_info ;struct sk_buff *skb ;int reserve_len = 2 ;unsigned int i;i = rx_ring->next_to_use; buffer_info = &rx_ring->buffer_info[i]; while (!buffer_info->skb) { rx_desc = E1000_RX_DESC(*rx_ring, i) skb = dev_alloc_skb(adapter->rx_buffer_len + reserve_len); if (!skb) { break ; } skb_reserve(skb, reserve_len); skb->dev = netdev; buffer_info->skb = skb; buffer_info->length = adapter->rx_buffer_len; buffer_info->dma = pci_map_single(pdev, skb->data, adapter->rx_buffer_len, PCI_DMA_FROMDEVICE); rx_desc->buffer_addr = cpu_to_le64(buffer_info->dma); if (++i == rx_ring->count) i = 0 ;buffer_info = &rx_ring->buffer_info[i]; } rx_ring->next_to_use = i; }
设备收发包之NAPI/非NAPI方式收包
http://www.cnblogs.com/wanpengcoder/p/7419143.html