某大型互联网公司的网关在高峰期出现诡异的性能抖动:平时 10Mpps 稳定处理,但每隔几秒就掉到 5Mpps 持续数十毫秒。根因是批处理大小在运行时动态变化导致 cache miss 激增。数据平面的性能不只取决于算法,更取决于数据的组织方式。
一、引言:数据平面——从收包到转发的核心链路
前两章分别深入了 DPDK 的内存管理(Ch04)和轮询模式驱动(Ch05)。本章将两者串联起来,聚焦数据包在 DPDK 应用中的高效处理全链路:从无锁环形缓冲区 rte_ring 的精妙设计,到 rte_mbuf 分段链与零拷贝技巧,再到包解析辅助库、CRC/Hash 硬件卸载、TSO/LRO、Scatter-Gather I/O——这些机制共同构成了 DPDK 数据平面的核心引擎。
理解了这些机制,你就能回答一个关键问题:数据包在 DPDK 应用中是如何以零拷贝、无锁、批量处理的方式高效流动的?
二、rte_ring:无锁环形缓冲区
rte_ring 是 DPDK 中最基础的数据结构之一,它是一个基于数组实现的环形缓冲区(Ring Buffer),支持无锁的多生产者/多消费者操作。几乎所有 DPDK 组件都依赖它:PMD 收包后将 mbuf 放入 ring、应用从 ring 取出 mbuf 处理、mempool 内部用 ring 管理空闲 mbuf、多核之间通过 ring 传递消息……
2.1 Ring Buffer 的内存布局
rte_ring 的核心设计极其简洁:一块连续的指针数组 + 两个头尾指针。大小必须是 2 的幂(power of 2),这样取模运算可以用位与(mask)替代除法,极大提升性能。
关键设计要点:
- size 为 2 的幂:
index = (head) & mask,一条 AND 指令替代 MOD 运算 - head/tail 分离:生产者操作
prod.head和prod.tail,消费者操作cons.head和cons.tail,两者互不干扰 - 两阶段提交:先移动 head 预留空间,完成操作后再移动 tail 确认。这保证了其他线程看到的始终是一致的状态
- 水线(Watermark):可设置高水位线,当 ring 中元素数量超过阈值时,enqueue 返回错误,实现背压(Backpressure)
rte_ring 存储的是指针(void *),而不是数据本身。这意味着 ring 中传递的是 mbuf 的地址,而非 mbuf 的内容。这是零拷贝设计的基础——数据始终在 mbuf 中不动,只有指针在 ring 中流转。
2.2 SPSC:单生产者单消费者
SPSC(Single Producer Single Consumer)是最简单、最快的 ring 模式。当只有一个线程入队、一个线程出队时,不需要任何原子操作或 CAS(Compare-And-Swap),只需普通的内存读写加上适当的内存屏障。
SPSC enqueue 流程:
// SPSC 入队(简化逻辑)static __rte_always_inline unsigned intrte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n){ uint32_t prod_head = r->prod.head; // 读取生产者头 uint32_t cons_tail = r->cons.tail; // 读取消费者尾(可见的已出队位置) uint32_t free = r->size + cons_tail - prod_head; // 可用空间
if (unlikely(n > free)) return 0; // 空间不足
// 将对象写入 ring 数组 const uint32_t mask = r->mask; uint32_t idx = prod_head & mask; // 批量拷贝指针到 ring for (i = 0; i < n; i++) r->ring[(idx + i) & mask] = obj_table[i];
// 更新生产者头(SPSC 不需要 CAS) rte_smp_wmb(); // 内存屏障:确保写入数据在更新指针之前可见 r->prod.head = prod_head + n; r->prod.tail = prod_head + n; // SPSC 中 head == tail 可同时更新 return n;}SPSC 的性能优势来自两点:
- 无原子操作:不需要 CAS 指令(
lock cmpxchg),避免了总线锁和缓存行弹跳 - 无重试循环:不需要”尝试→失败→重试”的 CAS 循环,路径确定性强
在典型的 pipeline 模型中,两个相邻 lcore 之间传递数据包就是 SPSC 场景——一个 lcore 负责收包入队,另一个 lcore 负责出队处理。
2.3 MPMC:多生产者多消费者
MPMC(Multi Producer Multi Consumer)模式允许多个线程同时入队和出队。这需要 CAS 操作来保证并发安全。
MPMC enqueue 的 CAS 流程:
// MPMC 入队(简化逻辑)static __rte_always_inline unsigned intrte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n){ uint32_t prod_head, prod_next; uint32_t cons_tail;
do { prod_head = r->prod.head; // 读取当前 head cons_tail = r->cons.tail; // 读取消费者 tail prod_next = prod_head + n; // 预计的新 head
if (n > r->size + cons_tail - prod_head) return 0; // 空间不足
} while (!rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next)); // ↑ CAS:如果 r->prod.head 仍等于 prod_head,则更新为 prod_next // 如果不等(其他线程已修改),则重试
// 写入数据到 ring const uint32_t mask = r->mask; uint32_t idx = prod_head & mask; for (i = 0; i < n; i++) r->ring[(idx + i) & mask] = obj_table[i];
// 等待之前的入队操作完成(保证顺序) while (r->prod.tail != prod_head) rte_pause(); // 自旋等待
rte_smp_wmb(); r->prod.tail = prod_next; // 确认入队 return n;}MPMC 的关键机制是 两阶段提交:
- 第一阶段(预留):CAS 修改
prod.head,预留 ring 中的空间。此时数据还没写入 - 第二阶段(确认):数据写入完成后,更新
prod.tail。其他消费者只有看到prod.tail更新后才会读取这些数据
MPMC 模式在高并发场景下可能遇到 CAS 竞争导致的性能下降。当多个生产者同时尝试 CAS 修改 prod.head 时,只有一个会成功,其余必须重试。如果竞争激烈,CAS 重试率会显著上升,导致 CPU 时间浪费在自旋等待上。DPDK 提供了 RTS 和 HTS 模式来缓解这一问题。
2.4 RTS 与 HTS:可扩展性优化
当生产者/消费者数量增多时,标准 MPMC 模式的 CAS 竞争会加剧。DPDK 从 19.08 版本开始引入了两种改进模式:
RTS(Relaxed Tail Sync):
RTS 模式放宽了 tail 指针的同步要求。在标准 MPMC 中,一个线程必须等待前面所有线程完成数据写入后才能更新 tail(严格顺序保证)。RTS 允许 tail 指针”跳跃式”更新——只要数据已经写入,即使前面还有线程未完成,也可以更新 tail。
RTS 的核心思想是:用 prod.head 和 prod.tail 之间的差值来跟踪”正在进行的入队操作”,而不是强制严格顺序。这减少了线程间的等待时间,提高了多生产者场景的吞吐量。
HTS(Head/Tail Sync):
HTS 模式采用”暂存区”策略:每个线程在入队时先获取一个 slot 的独占权(通过 CAS 修改 head),然后写入数据,最后更新 tail。与标准 MPMC 不同的是,HTS 不需要等待其他线程——每个线程独立完成自己的 head→tail 更新。
HTS 的优势在于延迟更稳定:不会因为某个慢线程阻塞其他线程的 tail 更新。代价是 tail 的更新可能不是严格递增的,需要额外的逻辑来保证正确性。
| 模式 | CAS 竞争 | 延迟稳定性 | 适用场景 |
|---|---|---|---|
| SPSC | 无 | 最好 | Pipeline 模式,一对一传递 |
| MPMC | 高(head 竞争 + tail 等待) | 差(慢线程阻塞) | 少量生产者/消费者 |
| RTS | 中(head 竞争,tail 放宽) | 中 | 多生产者,吞吐优先 |
| HTS | 中(head 竞争,tail 独立) | 好 | 多生产者,延迟敏感 |
2.5 零拷贝 Peek API
传统的 rte_ring_dequeue_bulk 是”出队即移除”——消费者从 ring 中取出指针后,这些 slot 立即变为可用。但在某些场景下,希望先窥视(Peek)数据,处理完成后再确认移除,这样可以实现零拷贝的流水线处理。
DPDK 20.11 引入了 Peek API,将出队操作拆分为三步:
// 零拷贝 Peek API 使用示例struct rte_ring *ring;void *objs[BURST_SIZE];
// 第一步:窥视——获取可读指针,但不移动 cons.tailuint32_t n = rte_ring_dequeue_bulk_start(ring, objs, BURST_SIZE, NULL);if (n == 0) return;
// 第二步:处理数据(零拷贝,直接在 mbuf 上操作)for (i = 0; i < n; i++) { struct rte_mbuf *m = objs[i]; // 直接在 mbuf 上处理,无需拷贝 process_packet(m);}
// 第三步:确认——移动 cons.tail,释放 ring 中的 slotrte_ring_dequeue_finish(ring, n);Peek API 的价值在于:消费者可以在处理数据的同时,生产者继续往 ring 后面的 slot 写入新数据。只要 ring 足够大,生产和消费可以真正并行,不会因为 ring 满而阻塞。
2.6 Ring 创建与使用示例
#include <rte_ring.h>#include <rte_mempool.h>#include <rte_mbuf.h>
#define RING_SIZE 1024#define BURST_SIZE 32
// 创建 ringstruct rte_ring *ring;ring = rte_ring_create("pkt_ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);// RING_F_SP_ENQ: 单生产者入队// RING_F_SC_DEQ: 单消费者出队// 使用 SPSC 模式,性能最优
if (ring == NULL) { rte_exit(EXIT_FAILURE, "Cannot create ring\n");}
// === 生产者线程 ===void producer_loop(struct rte_mempool *mbuf_pool){ struct rte_mbuf *bufs[BURST_SIZE];
while (1) { // 从 mempool 批量获取 mbuf if (rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, BURST_SIZE) != 0) continue;
// 填充数据到 mbuf(实际场景中由 PMD 收包填充) // ...
// SPSC 入队 uint16_t enqueued = rte_ring_sp_enqueue_bulk(ring, (void *)bufs, BURST_SIZE, NULL);
if (enqueued == 0) { // ring 已满,释放未入队的 mbuf rte_pktmbuf_free_bulk(bufs, BURST_SIZE); } }}
// === 消费者线程 ===void consumer_loop(void){ struct rte_mbuf *bufs[BURST_SIZE];
while (1) { // SPSC 出队 uint16_t n = rte_ring_sc_dequeue_bulk(ring, (void **)bufs, BURST_SIZE, NULL);
if (n == 0) continue;
// 处理数据包 for (uint16_t i = 0; i < n; i++) { process_packet(bufs[i]); }
// 释放 mbuf 回 mempool rte_pktmbuf_free_bulk(bufs, n); }}创建 ring 时通过 RING_F_SP_ENQ | RING_F_SC_DEQ 标志指定 SPSC 模式。如果后续误用 rte_ring_mp_enqueue_bulk() 入队,DPDK 会在调试模式下输出警告。生产环境中务必确保入队/出队函数与 ring 的创建标志一致。
三、rte_mbuf 分段链与零拷贝
在 Ch04 中介绍了 rte_mbuf 的基本结构——它是 DPDK 中数据包的载体,从 mempool 分配,承载从网卡收到的原始数据。本节深入 mbuf 的分段链机制和零拷贝技术,这是处理大包(Jumbo Frame)和避免数据拷贝的关键。
3.1 mbuf 分段链
单个 mbuf 的数据区域大小有限(通常为 2048 字节,由 rte_pktmbuf_data_room_size 决定)。当收到的数据包超过这个大小时——例如 9000 字节的 Jumbo Frame——DPDK 使用**分段链(Segment Chain)**将多个 mbuf 链接起来表示一个完整的数据包。
分段链的关键字段:
// lib/mbuf/rte_mbuf.h(简化)struct rte_mbuf { // ... uint16_t nb_segs; // 分段数量(仅在链头有效) uint16_t port; // 接收端口 uint32_t pkt_len; // 整个数据包的总长度(所有分段之和) uint16_t data_len; // 当前分段的数据长度 struct rte_mbuf *next; // 指向下一个分段(链尾为 NULL) // ...};规则:
- 链头 mbuf 的
nb_segs和pkt_len描述整个链的属性 - 后续 mbuf 的
nb_segs和pkt_len无意义,只看data_len和next - 链尾 mbuf 的
next为NULL - 遍历分段链的标准方式:
mb = m; while (mb != NULL) { ... ; mb = mb->next; }
3.2 Jumbo Frame 处理
当网卡收到超过单个 mbuf 容量的数据包时,PMD 会自动将数据分散到多个 mbuf 中,形成分段链。这需要两个前提条件:
- 网卡支持 Scatter-Gather:网卡 DMA 能将一个数据包分散到多个物理地址
- mempool 中有足够的 mbuf:每个分段都需要一个独立的 mbuf
// 检查 mbuf 是否为分段包static inline intis_jumbo_frame(struct rte_mbuf *m){ return m->nb_segs > 1;}
// 遍历分段链,计算总数据长度static inline uint32_tcalc_total_len(struct rte_mbuf *m){ uint32_t total = 0; struct rte_mbuf *seg = m; while (seg != NULL) { total += seg->data_len; seg = seg->next; } return total;}
// 访问分段链中的特定偏移量static inline uint8_t *mbuf_data_at_offset(struct rte_mbuf *m, uint32_t offset){ struct rte_mbuf *seg = m; while (seg != NULL) { if (offset < seg->data_len) return rte_pktmbuf_mtod_offset(seg, uint8_t *, offset); offset -= seg->data_len; seg = seg->next; } return NULL; // 偏移超出数据包长度}处理分段链时,务必遍历所有 segment,不能假设数据在连续内存中。一个常见错误是直接对链头 mbuf 调用 rte_pktmbuf_mtod() 后按 pkt_len 长度访问内存——这会越界读取到不属于当前 mbuf 的数据。对于分段包,必须逐段处理或使用 rte_pktmbuf_read() 辅助函数。
3.3 零拷贝技术
零拷贝是 DPDK 性能的核心支柱之一。在数据包处理过程中,希望数据始终留在原始 mbuf 中,避免任何内存拷贝。DPDK 提供了两种零拷贝机制:
rte_pktmbuf_clone(克隆):
克隆创建一个新的 mbuf 结构,但共享原始 mbuf 的数据区域。多个 mbuf 结构指向同一块数据,通过引用计数管理生命周期。
// 克隆 mbuf——零拷贝struct rte_mbuf *clone;clone = rte_pktmbuf_clone(original_mbuf, mbuf_pool);
// clone 与 original_mbuf 共享数据区域// original_mbuf 的引用计数 refcnt 增加到 2// 修改 clone 的数据会同时影响 original_mbuf(共享缓冲区)
// 释放时:两个 mbuf 都需要 free// 当 refcnt 降为 0 时,数据区域才真正释放回 mempoolrte_pktmbuf_free(clone); // refcnt: 2 → 1rte_pktmbuf_free(original_mbuf); // refcnt: 1 → 0,数据区域释放rte_pktmbuf_attach(附加):
attach 是更底层的操作,将一个 mbuf 附加到另一个 mbuf 的数据区域。clone 内部就是调用 attach 实现的。
// 手动附加 mbuf 到另一个 mbuf 的数据区域void rte_pktmbuf_attach(struct rte_mbuf *mi, struct rte_mbuf *m);
// mi 附加到 m 的数据区域// m 的 refcnt 增加// mi 的原数据区域被释放(如果 mi 之前有数据)// 之后 mi 和 m 共享数据
// 分离——将 mbuf 与其共享的数据区域断开void rte_pktmbuf_detach(struct rte_mbuf *m);// 分离后 mbuf 变为空状态,可以重新使用零拷贝的典型应用场景:
| 场景 | 技术 | 说明 |
|---|---|---|
| 组播/广播转发 | rte_pktmbuf_clone | 一个包需要发往多个端口,每个端口克隆一份 mbuf |
| 流表匹配 | rte_pktmbuf_clone | 将包克隆一份送控制面,原始包继续数据面转发 |
| IP 分片 | rte_pktmbuf_attach | 将原始 mbuf 的各段附加到分片后的新 mbuf |
| 隧道封装 | 直接修改 mbuf | 在 headroom 中添加新头部,无需拷贝数据 |
3.4 mbuf 链操作示例
#include <rte_mbuf.h>#include <rte_mempool.h>
// 将两个 mbuf 链拼接成一个链static inline voidmbuf_chain(struct rte_mbuf *head, struct rte_mbuf *tail){ // 找到 head 链的最后一个 segment struct rte_mbuf *last = head; while (last->next != NULL) last = last->next;
// 拼接 last->next = tail; head->nb_segs += tail->nb_segs; head->pkt_len += tail->pkt_len;}
// 从分段链中分离第一个 segment// 返回剩余的链(可能为 NULL)static inline struct rte_mbuf *mbuf_split_first(struct rte_mbuf *m){ struct rte_mbuf *rest = m->next;
m->next = NULL; m->nb_segs = 1; // m->pkt_len 保持为第一个 segment 的 data_len m->pkt_len = m->data_len;
if (rest != NULL) { // rest 成为新的链头,其 nb_segs 和 pkt_len 需要更新 // 注意:原始链中后续 segment 的 nb_segs/pkt_len 可能不准确 // 需要重新计算 } return rest;}
// 克隆分段包用于多端口转发static inline intclone_and_forward(struct rte_mbuf *m, struct rte_mempool *pool, uint16_t *dst_ports, int n_ports){ for (int i = 0; i < n_ports; i++) { struct rte_mbuf *copy; if (i == n_ports - 1) { // 最后一个端口直接使用原始 mbuf,无需克隆 copy = m; } else { copy = rte_pktmbuf_clone(m, pool); if (copy == NULL) { rte_pktmbuf_free(m); return -ENOMEM; } } // 发送到目标端口 rte_eth_tx_burst(dst_ports[i], 0, ©, 1); } return 0;}克隆分段链时,rte_pktmbuf_clone 会为链头创建一个新的 mbuf 结构,但后续 segment 的 mbuf 不会被复制——它们通过 next 指针共享。这意味着克隆后的链和原始链共享除链头以外的所有 segment mbuf,引用计数会相应增加。释放时必须使用 rte_pktmbuf_free(而非手动操作 next 指针),确保引用计数正确递减。
四、包解析辅助库
数据包进入 DPDK 应用后,第一步通常是解析协议头——提取 Ethernet、IP、TCP/UDP 等层的信息。DPDK 提供了一系列辅助库,定义了标准的协议头结构体和解析函数,避免开发者手动计算偏移量。
4.1 rte_ether_hdr:Ethernet 头解析
struct rte_ether_addr { uint8_t addr_bytes[RTE_ETHER_ADDR_LEN]; // 6 字节 MAC 地址} __rte_aligned(2);
struct rte_ether_hdr { struct rte_ether_addr dst_addr; // 目标 MAC struct rte_ether_addr src_addr; // 源 MAC rte_be16_t ether_type; // 上层协议类型} __rte_aligned(2);// ether_type 常见值:// RTE_ETHER_TYPE_IPV4 (0x0800) — IPv4// RTE_ETHER_TYPE_IPV6 (0x086DD) — IPv6// RTE_ETHER_TYPE_ARP (0x0806) — ARP// RTE_ETHER_TYPE_VLAN (0x8100) — 802.1Q VLAN// RTE_ETHER_TYPE_QINQ (0x88A8) — 802.1ad QinQ4.2 rte_ipv4_hdr:IPv4 头解析
struct rte_ipv4_hdr { uint8_t version_ihl; // 版本(4bit) + 头部长度(4bit) uint8_t type_of_service; // DSCP + ECN rte_be16_t total_length; // IP 包总长度 rte_be16_t packet_id; // 标识 rte_be16_t fragment_offset; // 分片偏移 + 标志 uint8_t time_to_live; // TTL uint8_t next_proto_id; // 上层协议号 rte_be16_t hdr_checksum; // 头部校验和 rte_be32_t src_addr; // 源 IP rte_be32_t dst_addr; // 目标 IP} __rte_aligned(2);
// 常用辅助函数uint16_t rte_ipv4_hdr_len(const struct rte_ipv4_hdr *ipv4_hdr);// 返回 IP 头部长度(字节),根据 IHL 字段计算
int rte_ipv4_frag_pkt_is_fragmented(const struct rte_ipv4_hdr *hdr);// 判断 IP 包是否被分片4.3 rte_tcp_hdr / rte_udp_hdr:传输层头解析
struct rte_tcp_hdr { rte_be16_t src_port; // 源端口 rte_be16_t dst_port; // 目标端口 rte_be32_t sent_seq; // 序列号 rte_be32_t recv_ack; // 确认号 uint8_t data_off; // 数据偏移(4bit)+ 保留(4bit) uint8_t tcp_flags; // TCP 标志位 rte_be16_t rx_win; // 接收窗口 rte_be16_t cksum; // 校验和 rte_be16_t tcp_urp; // 紧急指针} __rte_aligned(2);
// TCP 标志位常量#define RTE_TCP_CWR_FLAG 0x80 // 拥塞窗口减少#define RTE_TCP_ECE_FLAG 0x40 // ECN-Echo#define RTE_TCP_URG_FLAG 0x20 // 紧急#define RTE_TCP_ACK_FLAG 0x10 // 确认#define RTE_TCP_PSH_FLAG 0x08 // 推送#define RTE_TCP_RST_FLAG 0x04 // 重置#define RTE_TCP_SYN_FLAG 0x02 // 同步#define RTE_TCP_FIN_FLAG 0x01 // 结束
// lib/net/rte_udp.hstruct rte_udp_hdr { rte_be16_t src_port; // 源端口 rte_be16_t dst_port; // 目标端口 rte_be16_t dgram_len; // 数据报长度 rte_be16_t dgram_cksum; // 校验和} __rte_aligned(2);4.4 rte_net:包类型检测
手动逐层解析协议头既繁琐又容易出错。DPDK 提供了 rte_net 库,通过 rte_net_get_ptype() 自动检测数据包的协议类型:
uint32_t rte_net_get_ptype(struct rte_mbuf *m, struct rte_net_hdr_lens *pvlens, uint32_t layers);// 返回值:位掩码,表示检测到的协议层// layers 参数:指定要检测的协议层(L2/L3/L4)包类型返回值是位掩码,常用宏提取各层信息:
// 提取 L2 类型uint32_t l2_type = rte_net_ptype_get_l2(ptype);// RTE_PTYPE_L2_ETHER, RTE_PTYPE_L2_ETHER_VLAN, RTE_PTYPE_L2_ETHER_QINQ ...
// 提取 L3 类型uint32_t l3_type = rte_net_ptype_get_l3(ptype);// RTE_PTYPE_L3_IPV4, RTE_PTYPE_L3_IPV6, RTE_PTYPE_L3_IPV4_EXT ...
// 提取 L4 类型uint32_t l4_type = rte_net_ptype_get_l4(ptype);// RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_ICMP ...4.5 包解析完整示例
#include <rte_net.h>#include <rte_ether.h>#include <rte_ip.h>#include <rte_tcp.h>#include <rte_udp.h>
// 完整的包解析函数static inline intparse_packet(struct rte_mbuf *m, struct rte_net_hdr_lens *hdr_lens){ // 方法一:使用 rte_net_get_ptype 自动检测 uint32_t ptype = rte_net_get_ptype(m, hdr_lens, RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK | RTE_PTYPE_L4_MASK);
// 方法二:手动逐层解析(更灵活,可处理非标准包) struct rte_ether_hdr *eth; struct rte_ipv4_hdr *ipv4; struct rte_tcp_hdr *tcp; struct rte_udp_hdr *udp;
// L2: Ethernet eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); uint16_t ether_type = rte_be_to_cpu_16(eth->ether_type);
// 处理 VLAN 标签 uint32_t offset = sizeof(struct rte_ether_hdr); if (ether_type == RTE_ETHER_TYPE_VLAN) { // 跳过 802.1Q VLAN 标签(4 字节) struct rte_vlan_hdr *vlan = rte_pktmbuf_mtod_offset(m, struct rte_vlan_hdr *, offset); ether_type = rte_be_to_cpu_16(vlan->eth_proto); offset += sizeof(struct rte_vlan_hdr); }
// L3: IPv4 if (ether_type == RTE_ETHER_TYPE_IPV4) { ipv4 = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *, offset); offset += rte_ipv4_hdr_len(ipv4);
uint32_t src_ip = rte_be_to_cpu_32(ipv4->src_addr); uint32_t dst_ip = rte_be_to_cpu_32(ipv4->dst_addr);
// L4: TCP / UDP if (ipv4->next_proto_id == IPPROTO_TCP) { tcp = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, offset); uint16_t src_port = rte_be_to_cpu_16(tcp->src_port); uint16_t dst_port = rte_be_to_cpu_16(tcp->dst_port); printf("TCP: %u -> %u\n", src_port, dst_port); } else if (ipv4->next_proto_id == IPPROTO_UDP) { udp = rte_pktmbuf_mtod_offset(m, struct rte_udp_hdr *, offset); uint16_t src_port = rte_be_to_cpu_16(udp->src_port); uint16_t dst_port = rte_be_to_cpu_16(udp->dst_port); printf("UDP: %u -> %u\n", src_port, dst_port); } }
return 0;}rte_pktmbuf_mtod_offset(m, type, offset) 是 DPDK 中访问 mbuf 数据的常用宏,它返回 m->data_addr + offset 处的指针,并强制转换为指定类型。这个宏只访问链头 mbuf 的数据区域,如果 offset 超出链头 data_len,需要使用 rte_pktmbuf_read() 代替。
五、CRC 与 Hash 硬件卸载
数据包处理中,校验和计算和哈希查找是两个高频操作。如果全部由 CPU 软件完成,会消耗大量 CPU 周期。DPDK 支持将这些操作卸载到网卡硬件,释放 CPU 用于业务逻辑。
5.1 CRC 硬件卸载
以太网帧尾部有 4 字节的 CRC(FCS,Frame Check Sequence),用于检测传输错误。传统内核网络栈在收包时由网卡硬件自动剥离 CRC 并校验,DPDK 同样支持这一特性。
RX 侧 CRC 卸载:
// 端口配置时启用 CRC 剥离struct rte_eth_conf port_conf = { .rxmode = { .offloads = DEV_RX_OFFLOAD_CRC_STRIP, // 硬件自动剥离 CRC // 注意:较新 DPDK 版本中 CRC_STRIP 已默认启用 // 旧版本需要显式设置 },};TX 侧 CRC 卸载:
发送时,网卡硬件自动在帧尾部插入 CRC,应用无需计算。这是几乎所有网卡都支持的基本功能,DPDK 默认启用。
校验和卸载:
比 CRC 更重要的是 IP/TCP/UDP 校验和的硬件卸载:
// RX 侧:网卡校验并标记校验和是否正确struct rte_eth_conf port_conf = { .rxmode = { .offloads = DEV_RX_OFFLOAD_IPV4_CKSUM | // IPv4 头部校验和 DEV_RX_OFFLOAD_UDP_CKSUM | // UDP 校验和 DEV_RX_OFFLOAD_TCP_CKSUM, // TCP 校验和 },};
// 收包后检查校验和状态struct rte_mbuf *m;// ... 收包后 ...if (m->ol_flags & RTE_MBUF_F_RX_IP_CKSUM_BAD) { // IPv4 校验和错误,丢弃 rte_pktmbuf_free(m); continue;}if (m->ol_flags & RTE_MBUF_F_RX_L4_CKSUM_BAD) { // TCP/UDP 校验和错误,丢弃 rte_pktmbuf_free(m); continue;}
// TX 侧:网卡计算并插入校验和struct rte_eth_conf port_conf = { .txmode = { .offloads = DEV_TX_OFFLOAD_IPV4_CKSUM | // 硬件计算 IPv4 校验和 DEV_TX_OFFLOAD_UDP_CKSUM | // 硬件计算 UDP 校验和 DEV_TX_OFFLOAD_TCP_CKSUM, // 硬件计算 TCP 校验和 },};
// 发包前设置 mbuf 标志,告诉网卡需要计算哪些校验和m->ol_flags = RTE_MBUF_F_TX_IPV4 | // 使用 IPv4 RTE_MBUF_F_TX_IP_CKSUM | // 硬件计算 IP 校验和 RTE_MBUF_F_TX_TCP_CKSUM; // 硬件计算 TCP 校验和m->l2_len = sizeof(struct rte_ether_hdr);m->l3_len = sizeof(struct rte_ipv4_hdr);m->l4_len = sizeof(struct rte_tcp_hdr);5.2 rte_hash:软件哈希库
当硬件卸载不可用或需要更灵活的哈希功能时,DPDK 提供了 rte_hash 库——基于 Cuckoo Hash 的高性能哈希表。
Cuckoo Hash 的特点:
- O(1) 最坏情况查找:每个 key 最多映射到两个 bucket,查找最多访问两次
- 高空间利用率:负载因子可达 90% 以上
- 无锁读:读操作不需要加锁,适合读多写少场景
- 批量查找:支持一次查找多个 key,利用缓存局部性
#include <rte_hash.h>
// 创建哈希表struct rte_hash_parameters hash_params = { .name = "flow_table", .entries = 1 << 20, // 最大条目数 .key_len = sizeof(struct flow_key), // key 长度 .hash_func = rte_jhash, // 哈希函数 .hash_func_init_val = 0, .socket_id = rte_socket_id(),};struct rte_hash *flow_table = rte_hash_create(&hash_params);
// 查找struct flow_key key = { .src_ip = src, .dst_ip = dst, .src_port = sp, .dst_port = dp };int32_t index = rte_hash_lookup(flow_table, &key);if (index >= 0) { // 找到,index 是数据数组中的位置 void *data = rte_hash_get_key_with_position(flow_table, index, NULL);}
// 插入int ret = rte_hash_add_key_data(flow_table, &key, flow_data);
// 删除ret = rte_hash_del_key(flow_table, &key);5.3 RSS:接收端缩放
RSS(Receive Side Scaling)是网卡硬件的哈希分流机制。网卡对每个收到的数据包计算一个哈希值,根据哈希值将包分配到不同的 RX 队列。每个 RX 队列绑定到一个 lcore,从而实现多核并行处理。
RSS 配置:
// 配置 RSSstruct rte_eth_rss_conf rss_conf = { .rss_key = rss_key, // RSS 哈希密钥(40 字节) .rss_key_len = 40, .rss_hf = RTE_ETH_RSS_IP | // 对 IP 头部(src/dst IP)计算哈希 RTE_ETH_RSS_TCP | // 对 TCP 头部(src/dst port)计算哈希 RTE_ETH_RSS_UDP, // 对 UDP 头部(src/dst port)计算哈希};
struct rte_eth_conf port_conf = { .rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, // 启用 RSS 多队列模式 }, .rx_adv_conf = { .rss_conf = rss_conf, },};RSS 的关键特性:
- 保序性:同一五元组(src_ip, dst_ip, src_port, dst_port, protocol)的包始终哈希到同一队列,保证流的包序不被打乱
- 哈希算法:大多数网卡使用 Toeplitz Hash,DPDK 提供了
rte_thash库用于软件端复现相同的哈希值 - RSS Key:默认 key 可能导致哈希分布不均匀,建议使用非对称 RSS key
5.4 rte_thash:Toeplitz 哈希
在某些场景下,DPDK 应用需要在软件端计算与网卡 RSS 相同的哈希值——例如确定某个流会被分配到哪个 RX 队列,或者在控制面预计算分流策略。
#include <rte_thash.h>
// 使用与网卡相同的 RSS key 计算 Toeplitz 哈希uint8_t rss_key[40];// 从网卡获取 RSS keyrte_eth_dev_rss_hash_conf_get(port_id, &rss_conf);memcpy(rss_key, rss_conf.rss_key, 40);
// 计算五元组的 Toeplitz 哈希union rte_thash_tuple tuple;tuple.v4.src_addr = rte_cpu_to_be_32(src_ip);tuple.v4.dst_addr = rte_cpu_to_be_32(dst_ip);tuple.v4.sport = rte_cpu_to_be_16(src_port);tuple.v4.dport = rte_cpu_to_be_16(dst_port);
uint32_t hash = rte_softrss((uint32_t *)&tuple, RTE_THASH_V4_L4_LEN, rss_key);
// 根据 hash 确定目标 RX 队列uint16_t queue_id = hash % nb_rx_queues;RSS 哈希只考虑五元组,不考虑包的大小和内容。这意味着如果网络中有大量相同五元组的流量(例如大量 HTTP 请求到同一个服务器),这些流量会被集中到一个 RX 队列和一个 lcore 上,造成负载不均衡。解决方案包括:使用非对称 RSS key、在应用层做二次分流(如 rte_distributor)、或使用支持更细粒度分流的网卡。
六、TSO 与 LRO
TCP 协议对段大小有限制(MSS,通常为 1460 字节)。当应用发送大量数据时,TCP 层需要将数据分割为多个不超过 MSS 的段。如果由 CPU 完成分割,每次发送大块数据都需要多次协议头填充和校验和计算。TSO 和 LRO 将这些操作卸载到网卡硬件,大幅减少 CPU 开销。
6.1 TSO(TCP Segmentation Offload)
TSO 允许应用将一个大的 TCP 数据块(远超 MSS)直接交给网卡,由网卡硬件完成 TCP 分段。应用只需构造一个包含完整数据的大 mbuf,网卡自动将其分割为多个符合 MSS 的 TCP 段并发送。
// 启用 TSOstruct rte_eth_conf port_conf = { .txmode = { .offloads = DEV_TX_OFFLOAD_TCP_TSO, },};
// 发送大段数据时使用 TSOvoid send_large_data(struct rte_mbuf *m, uint16_t port_id){ // 设置 mbuf 标志 m->ol_flags = RTE_MBUF_F_TX_IPV4 | RTE_MBUF_F_TX_IP_CKSUM | RTE_MBUF_F_TX_TCP_CKSUM | RTE_MBUF_F_TX_TCP_SEG; // 启用 TSO
// 设置各层头部长度 m->l2_len = sizeof(struct rte_ether_hdr); // 14 m->l3_len = sizeof(struct rte_ipv4_hdr); // 20 m->l4_len = sizeof(struct rte_tcp_hdr); // 20
// 设置 TSO 分段大小(MSS) m->tso_segsz = 1460; // TCP MSS
// 发送——网卡会自动将大包分割为多个 TCP 段 rte_eth_tx_burst(port_id, 0, &m, 1);}TSO 的性能影响极为显著:
| 指标 | 无 TSO | 有 TSO | 改善 |
|---|---|---|---|
| TX 包数(64KB 数据) | ~44 个 TCP 段 | 1 个大 mbuf | 减少 44 倍 |
| CPU 周期/包 | ~200 cycles | ~200 cycles | 总周期减少 44 倍 |
| PCIe DMA 事务 | 44 次 | 1 次 | 大幅减少 |
| 缓存压力 | 44 次 mbuf 访问 | 1 次 mbuf 访问 | 大幅降低 |
TSO 在 Ch05 介绍的向量化发送路径中尤为重要。没有 TSO 时,64KB 的数据需要 44 次 TX 描述符和 DMA 操作;启用 TSO 后,只需 1 次 TX 描述符提交,网卡内部完成分段。这不仅减少了 CPU 开销,还降低了 PCIe 总线压力,对高吞吐场景至关重要。
6.2 LRO(Large Receive Offload)
LRO 是 TSO 的逆操作:网卡将收到的多个同一 TCP 流的段合并为一个大段,再交给驱动。这减少了应用需要处理的包数量,降低了协议栈处理开销。
// 启用 LROstruct rte_eth_conf port_conf = { .rxmode = { .offloads = DEV_RX_OFFLOAD_TCP_LRO, .max_lro_pkt_size = 65535, // LRO 合并后的最大包大小 },};
// 收包后检查是否为 LRO 合并包struct rte_mbuf *m;// ... 收包后 ...if (m->ol_flags & RTE_MBUF_F_RX_LRO_CKSUM) { // 这是一个 LRO 合并包 // m->pkt_len 可能远大于 MSS // m->tso_segsz 包含合并前的段数量信息}LRO 的注意事项:
- 保序性:LRO 必须保证合并的段是连续的(序列号递增),不能乱序合并
- TCP 标志:带有 URG、SYN、FIN、RST 标志的段不能合并
- 窗口更新:窗口大小变化的段不能合并
- 超时:如果一段时间内没有收到新的段,网卡应该提交已合并的数据,避免无限等待
LRO 在转发场景中需要谨慎使用。如果 DPDK 应用是路由器/交换机,直接转发收到的包,那么 LRO 合并后的包可能超过出接口的 MTU,需要重新分片。此外,LRO 会改变包的到达模式——原本是多个小包,现在变成一个大包——这可能影响基于包计数或包间延迟的算法。在转发场景中,通常建议禁用 LRO。
七、Scatter-Gather I/O
Scatter-Gather I/O(也称为向量 I/O)允许一次 I/O 操作处理多个不连续的内存区域。在 DPDK 中,这体现为多段 mbuf 的发送和接收。
7.1 多段 TX 发送
当 mbuf 是分段链(nb_segs > 1)时,网卡需要将多个 mbuf 的数据通过一次 DMA 操作发送出去。这需要网卡支持 Scatter-Gather,并且驱动正确配置 TX 描述符。
// 检查网卡是否支持多段发送struct rte_eth_dev_info dev_info;rte_eth_dev_info_get(port_id, &dev_info);
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MULTI_SEGS) { printf("网卡支持多段发送\n");}
// 启用多段发送struct rte_eth_conf port_conf = { .txmode = { .offloads = DEV_TX_OFFLOAD_MULTI_SEGS, },};多段发送的典型场景:
IP 分片:当转发路径的出接口 MTU 小于入接口时,需要对 IP 包进行分片。每个分片对应一个 mbuf,它们通过 next 指针链接成链,一次 rte_eth_tx_burst 即可发送整个分片链。
// IP 分片示例(简化)struct rte_mbuf *fragments[8];int n_frags = rte_ipv4_fragment_packet( original_mbuf, fragments, // 分片后的 mbuf 数组 8, // 最多 8 个分片 mtu, // 出接口 MTU mbuf_pool, // 用于分配新 mbuf 的 mempool mbuf_pool);
// fragments[0] 是第一个分片,可能是一个分段链// 一次发送所有分片rte_eth_tx_burst(port_id, queue_id, fragments, n_frags);隧道封装:在原始数据包前面添加隧道头(如 VXLAN、GRE),如果 headroom 空间不足,需要分配一个新的 mbuf 作为链头,包含隧道头,原始 mbuf 作为后续 segment。
// VXLAN 封装示例(简化)struct rte_mbuf *outer = rte_pktmbuf_alloc(mbuf_pool);
// 在 outer mbuf 中构造外层头部// Ethernet + IPv4 + UDP + VXLANstruct rte_ether_hdr *eth = rte_pktmbuf_mtod(outer, struct rte_ether_hdr *);// ... 填充外层头部 ...outer->data_len = outer_hdr_len;outer->pkt_len = outer_hdr_len + original_mbuf->pkt_len;
// 将原始 mbuf 链接到 outerouter->next = original_mbuf;outer->nb_segs = 1 + original_mbuf->nb_segs;
// 发送——网卡通过 Scatter-Gather 一次发送 outer + originalrte_eth_tx_burst(port_id, queue_id, &outer, 1);7.2 多段 RX 接收
在 Ch05 中提到,当网卡收到的数据包超过单个 mbuf 的数据区域大小时,PMD 会将数据分散到多个 mbuf 中,形成分段链。这需要:
- 网卡支持 Scatter-Gather 接收
- RX 队列配置时启用多段接收
// 启用多段接收struct rte_eth_conf port_conf = { .rxmode = { .offloads = DEV_RX_OFFLOAD_SCATTER, },};
// 配置 RX 队列时指定每个描述符可用的 mbuf 数量struct rte_eth_rxconf rxconf = { .rx_free_thresh = 32,};// 对于支持 Scatter-Gather 的网卡,需要为每个描述符// 预分配多个 mbuf(通过 rx_seg 配置)7.3 Scatter-Gather 性能考量
| 因素 | 单段 mbuf | 多段 mbuf(Scatter-Gather) |
|---|---|---|
| DMA 事务 | 1 次 | N 次(N = segment 数量) |
| 描述符消耗 | 1 个 | N 个 |
| 缓存局部性 | 好(数据连续) | 差(数据分散) |
| 适用场景 | 普通帧(< 2KB) | Jumbo Frame、隧道封装、IP 分片 |
Scatter-Gather 发送虽然方便,但性能不如单段发送。每个额外的 segment 都需要额外的 TX 描述符和 DMA 操作,增加了 PCIe 总线压力和网卡处理开销。在高吞吐场景中,如果可能,应尽量使用 headroom 空间来添加头部,避免产生多段 mbuf。DPDK 的 rte_pktmbuf_prepend() 可以在 headroom 中添加数据,只要 headroom 空间足够,就不会产生新的 segment。
八、动手实践
实践 1:创建 Ring 并基准测试 SPSC vs MPMC
本实践对比 SPSC 和 MPMC 模式的吞吐量差异,直观感受无锁 vs CAS 的性能差距。
#include <rte_ring.h>#include <rte_lcore.h>#include <rte_cycles.h>#include <rte_mbuf.h>#include <rte_mempool.h>
#define RING_SIZE 4096#define BURST_SIZE 32#define ITERATIONS 10000000
static struct rte_ring *spsc_ring;static struct rte_ring *mpmc_ring;static struct rte_mempool *mbuf_pool;
// SPSC 生产者static intspsc_producer(__rte_unused void *arg){ void *bufs[BURST_SIZE]; uint64_t start = rte_rdtsc();
for (int i = 0; i < ITERATIONS / BURST_SIZE; i++) { // 模拟入队(使用空指针,只测 ring 性能) for (int j = 0; j < BURST_SIZE; j++) bufs[j] = (void *)(uintptr_t)(i * BURST_SIZE + j);
while (rte_ring_sp_enqueue_bulk(spsc_ring, bufs, BURST_SIZE, NULL) == 0) ; // 自旋等待 }
uint64_t end = rte_rdtsc(); printf("SPSC Producer: %.2f Mops/s\n", (double)ITERATIONS / ((end - start) / rte_get_timer_hz()) / 1e6); return 0;}
// SPSC 消费者static intspsc_consumer(__rte_unused void *arg){ void *bufs[BURST_SIZE]; uint64_t count = 0; uint64_t start = rte_rdtsc();
while (count < ITERATIONS) { uint16_t n = rte_ring_sc_dequeue_bulk(spsc_ring, (void **)bufs, BURST_SIZE, NULL); count += n; }
uint64_t end = rte_rdtsc(); printf("SPSC Consumer: %.2f Mops/s\n", (double)ITERATIONS / ((end - start) / rte_get_timer_hz()) / 1e6); return 0;}
// MPMC 生产者(与 SPSC 类似,使用 rte_ring_mp_enqueue_bulk)// MPMC 消费者(与 SPSC 类似,使用 rte_ring_mc_dequeue_bulk)// ... 代码结构相同,替换函数名即可 ...
int main(int argc, char **argv){ rte_eal_init(argc, argv);
// 创建 SPSC ring spsc_ring = rte_ring_create("spsc", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 创建 MPMC ring(无 SP/SC 标志) mpmc_ring = rte_ring_create("mpmc", RING_SIZE, rte_socket_id(), 0);
// 分别运行 SPSC 和 MPMC 基准测试 // ... 启动 lcore 执行生产者/消费者 ...
rte_eal_cleanup(); return 0;}预期结果:SPSC 吞吐量通常比 MPMC 高 30%~50%,因为 SPSC 不需要 CAS 操作和重试循环。当生产者/消费者数量增加时,MPMC 的性能下降更明显。
实践 2:使用 rte_net 辅助库解析数据包
本实践演示如何使用 rte_net_get_ptype() 和手动解析两种方式提取数据包的协议信息。
#include <rte_net.h>#include <rte_ether.h>#include <rte_ip.h>#include <rte_tcp.h>#include <rte_udp.h>
static voidparse_with_rte_net(struct rte_mbuf *m){ struct rte_net_hdr_lens hdr_lens; uint32_t ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK | RTE_PTYPE_L4_MASK);
printf("Packet type: 0x%08x\n", ptype);
// L2 if (RTE_PTYPE_L2_MASK & ptype) printf(" L2: %s\n", (RTE_PTYPE_L2_ETHER & ptype) ? "Ethernet" : "Other");
// L3 if (RTE_PTYPE_L3_IPV4 & ptype) printf(" L3: IPv4 (hdr_len=%u)\n", hdr_lens.l3_len); else if (RTE_PTYPE_L3_IPV6 & ptype) printf(" L3: IPv6 (hdr_len=%u)\n", hdr_lens.l3_len);
// L4 if (RTE_PTYPE_L4_TCP & ptype) printf(" L4: TCP (hdr_len=%u)\n", hdr_lens.l4_len); else if (RTE_PTYPE_L4_UDP & ptype) printf(" L4: UDP (hdr_len=%u)\n", hdr_lens.l4_len); else if (RTE_PTYPE_L4_ICMP & ptype) printf(" L4: ICMP\n");}
static voidparse_manual(struct rte_mbuf *m){ struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); uint16_t etype = rte_be_to_cpu_16(eth->ether_type);
printf("Ethernet: " RTE_ETHER_ADDR_PRT_FMT " -> " RTE_ETHER_ADDR_PRT_FMT "\n", RTE_ETHER_ADDR_BYTES(ð->src_addr), RTE_ETHER_ADDR_BYTES(ð->dst_addr));
if (etype == RTE_ETHER_TYPE_IPV4) { struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(eth + 1); printf("IPv4: %u.%u.%u.%u -> %u.%u.%u.%u, proto=%u\n", IP4_FMT(rte_be_to_cpu_32(ip->src_addr)), IP4_FMT(rte_be_to_cpu_32(ip->dst_addr)), ip->next_proto_id); }}实践 3:启用 TSO 并测量 TX 性能提升
本实践对比启用/禁用 TSO 时的发送性能。
#include <rte_ethdev.h>#include <rte_mbuf.h>
// 构造一个大的 TCP 数据包(64KB)static struct rte_mbuf *build_large_tcp_packet(struct rte_mempool *pool, uint32_t payload_len){ struct rte_mbuf *m = rte_pktmbuf_alloc(pool); if (m == NULL) return NULL;
// 填充 Ethernet + IP + TCP 头部 + payload uint32_t total_len = sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr) + payload_len;
// 如果 payload 超过单个 mbuf 的数据区域,需要分段链 // ...(此处简化,假设 mbuf 足够大或使用分段链)
// 填充头部 struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); // ... 填充 Ethernet 头 ...
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(eth + 1); ip->total_length = rte_cpu_to_be_16(total_len - sizeof(struct rte_ether_hdr)); ip->next_proto_id = IPPROTO_TCP; // ... 填充 IP 头 ...
struct rte_tcp_hdr *tcp = (struct rte_tcp_hdr *)(ip + 1); // ... 填充 TCP 头 ...
m->pkt_len = total_len; m->data_len = total_len;
return m;}
// 测试 TSO 性能static voidbenchmark_tso(uint16_t port_id, struct rte_mempool *pool){ struct rte_mbuf *pkts[BURST_SIZE]; uint64_t total_bytes = 0; uint64_t start, end;
// 构造 64KB 的 TCP 包 for (int i = 0; i < BURST_SIZE; i++) { pkts[i] = build_large_tcp_packet(pool, 65536); pkts[i]->ol_flags = RTE_MBUF_F_TX_IPV4 | RTE_MBUF_F_TX_IP_CKSUM | RTE_MBUF_F_TX_TCP_CKSUM | RTE_MBUF_F_TX_TCP_SEG; // TSO pkts[i]->tso_segsz = 1460; // MSS pkts[i]->l2_len = sizeof(struct rte_ether_hdr); pkts[i]->l3_len = sizeof(struct rte_ipv4_hdr); pkts[i]->l4_len = sizeof(struct rte_tcp_hdr); }
start = rte_rdtsc(); for (int iter = 0; iter < 1000; iter++) { uint16_t sent = rte_eth_tx_burst(port_id, 0, pkts, BURST_SIZE); total_bytes += sent * 65536; } end = rte_rdtsc();
double gbps = (double)total_bytes * 8 / ((end - start) / rte_get_timer_hz()) / 1e9; printf("TSO enabled: %.2f Gbps\n", gbps);}预期结果:启用 TSO 后,64KB 数据只需一次 TX 操作(网卡内部拆分为 44 个 TCP 段),相比不启用 TSO 时需要 44 次 TX 操作,吞吐量提升 1050 倍。
实践 4:实现基本 L2 转发与包解析
本实践综合运用本章所学,实现一个完整的 L2 转发应用:收包→解析→修改 MAC→发包。
// l2fwd_parse.c — 基于 DPDK 的 L2 转发与包解析#include <rte_eal.h>#include <rte_ethdev.h>#include <rte_mbuf.h>#include <rte_ether.h>#include <rte_ip.h>#include <rte_tcp.h>#include <rte_net.h>
#define RX_RING_SIZE 1024#define TX_RING_SIZE 1024#define BURST_SIZE 32#define MAX_PKT_BURST 32
static const struct rte_eth_conf port_conf = { .rxmode = { .max_lro_pkt_size = 65535, .offloads = DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM | DEV_RX_OFFLOAD_TCP_CKSUM, }, .txmode = { .offloads = DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM | DEV_TX_OFFLOAD_TCP_CKSUM | DEV_TX_OFFLOAD_MULTI_SEGS, },};
// L2 转发:交换源/目标 MAC 地址static inline voidl2fwd_mac_swap(struct rte_mbuf *m){ struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); struct rte_ether_addr tmp;
rte_ether_addr_copy(ð->dst_addr, &tmp); rte_ether_addr_copy(ð->src_addr, ð->dst_addr); rte_ether_addr_copy(&tmp, ð->src_addr);}
// 包解析与统计struct port_stats { uint64_t rx_packets; uint64_t tx_packets; uint64_t ipv4_packets; uint64_t ipv6_packets; uint64_t tcp_packets; uint64_t udp_packets; uint64_t dropped;};
static struct port_stats stats;
static inline voidparse_and_count(struct rte_mbuf *m){ struct rte_net_hdr_lens hdr_lens; uint32_t ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK | RTE_PTYPE_L4_MASK);
if (ptype & RTE_PTYPE_L3_IPV4) stats.ipv4_packets++; else if (ptype & RTE_PTYPE_L3_IPV6) stats.ipv6_packets++;
if (ptype & RTE_PTYPE_L4_TCP) stats.tcp_packets++; else if (ptype & RTE_PTYPE_L4_UDP) stats.udp_packets++;}
// 主循环static voidmain_loop(uint16_t rx_port, uint16_t tx_port){ struct rte_mbuf *bufs[BURST_SIZE];
while (1) { // 收包 uint16_t nb_rx = rte_eth_rx_burst(rx_port, 0, bufs, BURST_SIZE);
if (unlikely(nb_rx == 0)) continue;
stats.rx_packets += nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) { // 检查校验和 if (bufs[i]->ol_flags & RTE_MBUF_F_RX_IP_CKSUM_BAD) { rte_pktmbuf_free(bufs[i]); stats.dropped++; bufs[i] = NULL; continue; }
// 解析并统计 parse_and_count(bufs[i]);
// L2 转发:交换 MAC 地址 l2fwd_mac_swap(bufs[i]); }
// 发包 uint16_t nb_tx = rte_eth_tx_burst(tx_port, 0, bufs, nb_rx); stats.tx_packets += nb_tx;
// 释放未发送的 mbuf if (unlikely(nb_tx < nb_rx)) { stats.dropped += (nb_rx - nb_tx); for (uint16_t i = nb_tx; i < nb_rx; i++) rte_pktmbuf_free(bufs[i]); } }}小结
本章深入剖析了 DPDK 数据平面的核心机制,这些机制共同构成了数据包在 DPDK 应用中高效流动的基础设施:
| 机制 | 核心作用 | 关键设计 |
|---|---|---|
rte_ring | 无锁数据传递 | SPSC 无原子操作、MPMC CAS 两阶段提交、RTS/HTS 可扩展优化 |
rte_mbuf 分段链 | 大包处理 | nb_segs + next 指针、Jumbo Frame 分散存储 |
| 零拷贝 | 避免数据拷贝 | clone/attach 共享数据区域、引用计数管理 |
| 包解析辅助库 | 协议头提取 | rte_ether/rte_ip/rte_tcp 结构体、rte_net_get_ptype 自动检测 |
| CRC/Hash 硬件卸载 | 释放 CPU | 网卡计算校验和、RSS 哈希分流、rte_thash 软件复现 |
| TSO/LRO | 减少 TX/RX 包数 | 网卡 TCP 分段/合并、吞吐量提升 10~50 倍 |
| Scatter-Gather I/O | 非连续内存 I/O | 多段 mbuf 一次 DMA、隧道封装和 IP 分片 |
数据平面设计的核心原则可以概括为三点:
- 零拷贝:数据始终在原始 mbuf 中,只有指针在 ring 中流转。clone/attach 是零拷贝的两种实现方式
- 无锁:SPSC 完全无锁,MPMC 通过 CAS 实现无锁并发,RTS/HTS 进一步优化可扩展性
- 批量处理:ring 的 burst 操作、PMD 的向量化收发包、TSO 的大段发送——都是”攒一批再处理”的思路
这些原则与 Ch04 中的大页内存、mempool 预分配,以及 Ch05 中的轮询模式、向量化收发包一脉相承——它们共同构成了 DPDK”用空间换时间、用确定性换吞吐”的设计哲学。
参考资料
- DPDK Programmer’s Guide — Ring Library — rte_ring 设计原理与 API 参考
- DPDK Programmer’s Guide — Mbuf Library — rte_mbuf 结构与分段链管理
- DPDK API Reference — rte_net — 包类型检测与协议头辅助函数
- DPDK Programmer’s Guide — Hash Library — rte_hash Cuckoo Hash 实现与使用
- DPDK Testpmd User Guide — TSO/LRO — TSO 和 LRO 的 testpmd 测试方法
- Intel Ethernet Controller Datasheet — 网卡硬件卸载功能的寄存器级描述
- RFC 7398 — RSS Considerations — RSS 哈希分流的安全与性能考量
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






