mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
5729 字
16 分钟
DPDK 数据平面核心机制
2025-05-02

某大型互联网公司的网关在高峰期出现诡异的性能抖动:平时 10Mpps 稳定处理,但每隔几秒就掉到 5Mpps 持续数十毫秒。根因是批处理大小在运行时动态变化导致 cache miss 激增。数据平面的性能不只取决于算法,更取决于数据的组织方式。

一、引言:数据平面——从收包到转发的核心链路#

前两章分别深入了 DPDK 的内存管理(Ch04)和轮询模式驱动(Ch05)。本章将两者串联起来,聚焦数据包在 DPDK 应用中的高效处理全链路:从无锁环形缓冲区 rte_ring 的精妙设计,到 rte_mbuf 分段链与零拷贝技巧,再到包解析辅助库、CRC/Hash 硬件卸载、TSO/LRO、Scatter-Gather I/O——这些机制共同构成了 DPDK 数据平面的核心引擎。

理解了这些机制,你就能回答一个关键问题:数据包在 DPDK 应用中是如何以零拷贝、无锁、批量处理的方式高效流动的?

二、rte_ring:无锁环形缓冲区#

rte_ring 是 DPDK 中最基础的数据结构之一,它是一个基于数组实现的环形缓冲区(Ring Buffer),支持无锁的多生产者/多消费者操作。几乎所有 DPDK 组件都依赖它:PMD 收包后将 mbuf 放入 ring、应用从 ring 取出 mbuf 处理、mempool 内部用 ring 管理空闲 mbuf、多核之间通过 ring 传递消息……

2.1 Ring Buffer 的内存布局#

rte_ring 的核心设计极其简洁:一块连续的指针数组 + 两个头尾指针。大小必须是 2 的幂(power of 2),这样取模运算可以用位与(mask)替代除法,极大提升性能。

graph TB subgraph rte_ring 内存布局 direction LR R["rte_ring 结构体<br/>┌─────────────────┐<br/>│ name │<br/>│ size = 8 │<br/>│ mask = 7 │<br/>│ prod.head = 3 │<br/>│ prod.tail = 3 │<br/>│ cons.head = 1 │<br/>│ cons.tail = 1 │<br/>└─────────────────┘"] ARR["指针数组 ring[0..7]<br/>┌───┬───┬───┬───┬───┬───┬───┬───┐<br/>│ ∅ │ ∅ │mb1│mb2│mb3│ ∅ │ ∅ │ ∅ │<br/>└───┴───┴───┴───┴───┴───┴───┴───┘<br/> 0 1 2 3 4 5 6 7<br/> ↑cons ↑prod"] end R --> ARR style R fill:#e3f2fd,stroke:#1565c0 style ARR fill:#fff3e0,stroke:#e65100

关键设计要点:

  • size 为 2 的幂index = (head) & mask,一条 AND 指令替代 MOD 运算
  • head/tail 分离:生产者操作 prod.headprod.tail,消费者操作 cons.headcons.tail,两者互不干扰
  • 两阶段提交:先移动 head 预留空间,完成操作后再移动 tail 确认。这保证了其他线程看到的始终是一致的状态
  • 水线(Watermark):可设置高水位线,当 ring 中元素数量超过阈值时,enqueue 返回错误,实现背压(Backpressure)
Note

rte_ring 存储的是指针(void *),而不是数据本身。这意味着 ring 中传递的是 mbuf 的地址,而非 mbuf 的内容。这是零拷贝设计的基础——数据始终在 mbuf 中不动,只有指针在 ring 中流转。

2.2 SPSC:单生产者单消费者#

SPSC(Single Producer Single Consumer)是最简单、最快的 ring 模式。当只有一个线程入队、一个线程出队时,不需要任何原子操作或 CAS(Compare-And-Swap),只需普通的内存读写加上适当的内存屏障。

SPSC enqueue 流程

// SPSC 入队(简化逻辑)
static __rte_always_inline unsigned int
rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table,
unsigned int n)
{
uint32_t prod_head = r->prod.head; // 读取生产者头
uint32_t cons_tail = r->cons.tail; // 读取消费者尾(可见的已出队位置)
uint32_t free = r->size + cons_tail - prod_head; // 可用空间
if (unlikely(n > free))
return 0; // 空间不足
// 将对象写入 ring 数组
const uint32_t mask = r->mask;
uint32_t idx = prod_head & mask;
// 批量拷贝指针到 ring
for (i = 0; i < n; i++)
r->ring[(idx + i) & mask] = obj_table[i];
// 更新生产者头(SPSC 不需要 CAS)
rte_smp_wmb(); // 内存屏障:确保写入数据在更新指针之前可见
r->prod.head = prod_head + n;
r->prod.tail = prod_head + n; // SPSC 中 head == tail 可同时更新
return n;
}

SPSC 的性能优势来自两点:

  1. 无原子操作:不需要 CAS 指令(lock cmpxchg),避免了总线锁和缓存行弹跳
  2. 无重试循环:不需要”尝试→失败→重试”的 CAS 循环,路径确定性强

在典型的 pipeline 模型中,两个相邻 lcore 之间传递数据包就是 SPSC 场景——一个 lcore 负责收包入队,另一个 lcore 负责出队处理。

2.3 MPMC:多生产者多消费者#

MPMC(Multi Producer Multi Consumer)模式允许多个线程同时入队和出队。这需要 CAS 操作来保证并发安全。

MPMC enqueue 的 CAS 流程

// MPMC 入队(简化逻辑)
static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table,
unsigned int n)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail;
do {
prod_head = r->prod.head; // 读取当前 head
cons_tail = r->cons.tail; // 读取消费者 tail
prod_next = prod_head + n; // 预计的新 head
if (n > r->size + cons_tail - prod_head)
return 0; // 空间不足
} while (!rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next));
// ↑ CAS:如果 r->prod.head 仍等于 prod_head,则更新为 prod_next
// 如果不等(其他线程已修改),则重试
// 写入数据到 ring
const uint32_t mask = r->mask;
uint32_t idx = prod_head & mask;
for (i = 0; i < n; i++)
r->ring[(idx + i) & mask] = obj_table[i];
// 等待之前的入队操作完成(保证顺序)
while (r->prod.tail != prod_head)
rte_pause(); // 自旋等待
rte_smp_wmb();
r->prod.tail = prod_next; // 确认入队
return n;
}

MPMC 的关键机制是 两阶段提交

  1. 第一阶段(预留):CAS 修改 prod.head,预留 ring 中的空间。此时数据还没写入
  2. 第二阶段(确认):数据写入完成后,更新 prod.tail。其他消费者只有看到 prod.tail 更新后才会读取这些数据
Warning

MPMC 模式在高并发场景下可能遇到 CAS 竞争导致的性能下降。当多个生产者同时尝试 CAS 修改 prod.head 时,只有一个会成功,其余必须重试。如果竞争激烈,CAS 重试率会显著上升,导致 CPU 时间浪费在自旋等待上。DPDK 提供了 RTS 和 HTS 模式来缓解这一问题。

2.4 RTS 与 HTS:可扩展性优化#

当生产者/消费者数量增多时,标准 MPMC 模式的 CAS 竞争会加剧。DPDK 从 19.08 版本开始引入了两种改进模式:

RTS(Relaxed Tail Sync)

RTS 模式放宽了 tail 指针的同步要求。在标准 MPMC 中,一个线程必须等待前面所有线程完成数据写入后才能更新 tail(严格顺序保证)。RTS 允许 tail 指针”跳跃式”更新——只要数据已经写入,即使前面还有线程未完成,也可以更新 tail。

RTS 的核心思想是:用 prod.headprod.tail 之间的差值来跟踪”正在进行的入队操作”,而不是强制严格顺序。这减少了线程间的等待时间,提高了多生产者场景的吞吐量。

HTS(Head/Tail Sync)

HTS 模式采用”暂存区”策略:每个线程在入队时先获取一个 slot 的独占权(通过 CAS 修改 head),然后写入数据,最后更新 tail。与标准 MPMC 不同的是,HTS 不需要等待其他线程——每个线程独立完成自己的 head→tail 更新。

HTS 的优势在于延迟更稳定:不会因为某个慢线程阻塞其他线程的 tail 更新。代价是 tail 的更新可能不是严格递增的,需要额外的逻辑来保证正确性。

模式CAS 竞争延迟稳定性适用场景
SPSC最好Pipeline 模式,一对一传递
MPMC高(head 竞争 + tail 等待)差(慢线程阻塞)少量生产者/消费者
RTS中(head 竞争,tail 放宽)多生产者,吞吐优先
HTS中(head 竞争,tail 独立)多生产者,延迟敏感

2.5 零拷贝 Peek API#

传统的 rte_ring_dequeue_bulk 是”出队即移除”——消费者从 ring 中取出指针后,这些 slot 立即变为可用。但在某些场景下,希望先窥视(Peek)数据,处理完成后再确认移除,这样可以实现零拷贝的流水线处理。

DPDK 20.11 引入了 Peek API,将出队操作拆分为三步:

// 零拷贝 Peek API 使用示例
struct rte_ring *ring;
void *objs[BURST_SIZE];
// 第一步:窥视——获取可读指针,但不移动 cons.tail
uint32_t n = rte_ring_dequeue_bulk_start(ring, objs, BURST_SIZE, NULL);
if (n == 0)
return;
// 第二步:处理数据(零拷贝,直接在 mbuf 上操作)
for (i = 0; i < n; i++) {
struct rte_mbuf *m = objs[i];
// 直接在 mbuf 上处理,无需拷贝
process_packet(m);
}
// 第三步:确认——移动 cons.tail,释放 ring 中的 slot
rte_ring_dequeue_finish(ring, n);

Peek API 的价值在于:消费者可以在处理数据的同时,生产者继续往 ring 后面的 slot 写入新数据。只要 ring 足够大,生产和消费可以真正并行,不会因为 ring 满而阻塞。

2.6 Ring 创建与使用示例#

#include <rte_ring.h>
#include <rte_mempool.h>
#include <rte_mbuf.h>
#define RING_SIZE 1024
#define BURST_SIZE 32
// 创建 ring
struct rte_ring *ring;
ring = rte_ring_create("pkt_ring", RING_SIZE,
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// RING_F_SP_ENQ: 单生产者入队
// RING_F_SC_DEQ: 单消费者出队
// 使用 SPSC 模式,性能最优
if (ring == NULL) {
rte_exit(EXIT_FAILURE, "Cannot create ring\n");
}
// === 生产者线程 ===
void producer_loop(struct rte_mempool *mbuf_pool)
{
struct rte_mbuf *bufs[BURST_SIZE];
while (1) {
// 从 mempool 批量获取 mbuf
if (rte_pktmbuf_alloc_bulk(mbuf_pool, bufs, BURST_SIZE) != 0)
continue;
// 填充数据到 mbuf(实际场景中由 PMD 收包填充)
// ...
// SPSC 入队
uint16_t enqueued = rte_ring_sp_enqueue_bulk(ring,
(void *)bufs, BURST_SIZE, NULL);
if (enqueued == 0) {
// ring 已满,释放未入队的 mbuf
rte_pktmbuf_free_bulk(bufs, BURST_SIZE);
}
}
}
// === 消费者线程 ===
void consumer_loop(void)
{
struct rte_mbuf *bufs[BURST_SIZE];
while (1) {
// SPSC 出队
uint16_t n = rte_ring_sc_dequeue_bulk(ring,
(void **)bufs, BURST_SIZE, NULL);
if (n == 0)
continue;
// 处理数据包
for (uint16_t i = 0; i < n; i++) {
process_packet(bufs[i]);
}
// 释放 mbuf 回 mempool
rte_pktmbuf_free_bulk(bufs, n);
}
}
Note

创建 ring 时通过 RING_F_SP_ENQ | RING_F_SC_DEQ 标志指定 SPSC 模式。如果后续误用 rte_ring_mp_enqueue_bulk() 入队,DPDK 会在调试模式下输出警告。生产环境中务必确保入队/出队函数与 ring 的创建标志一致。

三、rte_mbuf 分段链与零拷贝#

Ch04 中介绍了 rte_mbuf 的基本结构——它是 DPDK 中数据包的载体,从 mempool 分配,承载从网卡收到的原始数据。本节深入 mbuf 的分段链机制和零拷贝技术,这是处理大包(Jumbo Frame)和避免数据拷贝的关键。

3.1 mbuf 分段链#

单个 mbuf 的数据区域大小有限(通常为 2048 字节,由 rte_pktmbuf_data_room_size 决定)。当收到的数据包超过这个大小时——例如 9000 字节的 Jumbo Frame——DPDK 使用**分段链(Segment Chain)**将多个 mbuf 链接起来表示一个完整的数据包。

graph LR subgraph "Jumbo Frame (9K) 的 mbuf 分段链" M1["mbuf #1 (head)<br/>nb_segs = 4<br/>data_len = 2048<br/>pkt_len = 9000<br/>next →"] M2["mbuf #2<br/>data_len = 2048<br/>next →"] M3["mbuf #3<br/>data_len = 2048<br/>next →"] M4["mbuf #4 (tail)<br/>data_len = 2856<br/>next = NULL"] end M1 --> M2 --> M3 --> M4 style M1 fill:#e3f2fd,stroke:#1565c0 style M2 fill:#fff3e0,stroke:#e65100 style M3 fill:#fff3e0,stroke:#e65100 style M4 fill:#e8f5e9,stroke:#2e7d32

分段链的关键字段:

// lib/mbuf/rte_mbuf.h(简化)
struct rte_mbuf {
// ...
uint16_t nb_segs; // 分段数量(仅在链头有效)
uint16_t port; // 接收端口
uint32_t pkt_len; // 整个数据包的总长度(所有分段之和)
uint16_t data_len; // 当前分段的数据长度
struct rte_mbuf *next; // 指向下一个分段(链尾为 NULL)
// ...
};

规则:

  • 链头 mbufnb_segspkt_len 描述整个链的属性
  • 后续 mbufnb_segspkt_len 无意义,只看 data_lennext
  • 链尾 mbufnextNULL
  • 遍历分段链的标准方式:mb = m; while (mb != NULL) { ... ; mb = mb->next; }

3.2 Jumbo Frame 处理#

当网卡收到超过单个 mbuf 容量的数据包时,PMD 会自动将数据分散到多个 mbuf 中,形成分段链。这需要两个前提条件:

  1. 网卡支持 Scatter-Gather:网卡 DMA 能将一个数据包分散到多个物理地址
  2. mempool 中有足够的 mbuf:每个分段都需要一个独立的 mbuf
// 检查 mbuf 是否为分段包
static inline int
is_jumbo_frame(struct rte_mbuf *m)
{
return m->nb_segs > 1;
}
// 遍历分段链,计算总数据长度
static inline uint32_t
calc_total_len(struct rte_mbuf *m)
{
uint32_t total = 0;
struct rte_mbuf *seg = m;
while (seg != NULL) {
total += seg->data_len;
seg = seg->next;
}
return total;
}
// 访问分段链中的特定偏移量
static inline uint8_t *
mbuf_data_at_offset(struct rte_mbuf *m, uint32_t offset)
{
struct rte_mbuf *seg = m;
while (seg != NULL) {
if (offset < seg->data_len)
return rte_pktmbuf_mtod_offset(seg, uint8_t *, offset);
offset -= seg->data_len;
seg = seg->next;
}
return NULL; // 偏移超出数据包长度
}
Warning

处理分段链时,务必遍历所有 segment,不能假设数据在连续内存中。一个常见错误是直接对链头 mbuf 调用 rte_pktmbuf_mtod() 后按 pkt_len 长度访问内存——这会越界读取到不属于当前 mbuf 的数据。对于分段包,必须逐段处理或使用 rte_pktmbuf_read() 辅助函数。

3.3 零拷贝技术#

零拷贝是 DPDK 性能的核心支柱之一。在数据包处理过程中,希望数据始终留在原始 mbuf 中,避免任何内存拷贝。DPDK 提供了两种零拷贝机制:

rte_pktmbuf_clone(克隆)

克隆创建一个新的 mbuf 结构,但共享原始 mbuf 的数据区域。多个 mbuf 结构指向同一块数据,通过引用计数管理生命周期。

// 克隆 mbuf——零拷贝
struct rte_mbuf *clone;
clone = rte_pktmbuf_clone(original_mbuf, mbuf_pool);
// clone 与 original_mbuf 共享数据区域
// original_mbuf 的引用计数 refcnt 增加到 2
// 修改 clone 的数据会同时影响 original_mbuf(共享缓冲区)
// 释放时:两个 mbuf 都需要 free
// 当 refcnt 降为 0 时,数据区域才真正释放回 mempool
rte_pktmbuf_free(clone); // refcnt: 2 → 1
rte_pktmbuf_free(original_mbuf); // refcnt: 1 → 0,数据区域释放

rte_pktmbuf_attach(附加)

attach 是更底层的操作,将一个 mbuf 附加到另一个 mbuf 的数据区域。clone 内部就是调用 attach 实现的。

// 手动附加 mbuf 到另一个 mbuf 的数据区域
void rte_pktmbuf_attach(struct rte_mbuf *mi, struct rte_mbuf *m);
// mi 附加到 m 的数据区域
// m 的 refcnt 增加
// mi 的原数据区域被释放(如果 mi 之前有数据)
// 之后 mi 和 m 共享数据
// 分离——将 mbuf 与其共享的数据区域断开
void rte_pktmbuf_detach(struct rte_mbuf *m);
// 分离后 mbuf 变为空状态,可以重新使用

零拷贝的典型应用场景:

场景技术说明
组播/广播转发rte_pktmbuf_clone一个包需要发往多个端口,每个端口克隆一份 mbuf
流表匹配rte_pktmbuf_clone将包克隆一份送控制面,原始包继续数据面转发
IP 分片rte_pktmbuf_attach将原始 mbuf 的各段附加到分片后的新 mbuf
隧道封装直接修改 mbuf在 headroom 中添加新头部,无需拷贝数据

3.4 mbuf 链操作示例#

#include <rte_mbuf.h>
#include <rte_mempool.h>
// 将两个 mbuf 链拼接成一个链
static inline void
mbuf_chain(struct rte_mbuf *head, struct rte_mbuf *tail)
{
// 找到 head 链的最后一个 segment
struct rte_mbuf *last = head;
while (last->next != NULL)
last = last->next;
// 拼接
last->next = tail;
head->nb_segs += tail->nb_segs;
head->pkt_len += tail->pkt_len;
}
// 从分段链中分离第一个 segment
// 返回剩余的链(可能为 NULL)
static inline struct rte_mbuf *
mbuf_split_first(struct rte_mbuf *m)
{
struct rte_mbuf *rest = m->next;
m->next = NULL;
m->nb_segs = 1;
// m->pkt_len 保持为第一个 segment 的 data_len
m->pkt_len = m->data_len;
if (rest != NULL) {
// rest 成为新的链头,其 nb_segs 和 pkt_len 需要更新
// 注意:原始链中后续 segment 的 nb_segs/pkt_len 可能不准确
// 需要重新计算
}
return rest;
}
// 克隆分段包用于多端口转发
static inline int
clone_and_forward(struct rte_mbuf *m, struct rte_mempool *pool,
uint16_t *dst_ports, int n_ports)
{
for (int i = 0; i < n_ports; i++) {
struct rte_mbuf *copy;
if (i == n_ports - 1) {
// 最后一个端口直接使用原始 mbuf,无需克隆
copy = m;
} else {
copy = rte_pktmbuf_clone(m, pool);
if (copy == NULL) {
rte_pktmbuf_free(m);
return -ENOMEM;
}
}
// 发送到目标端口
rte_eth_tx_burst(dst_ports[i], 0, &copy, 1);
}
return 0;
}
Note

克隆分段链时,rte_pktmbuf_clone 会为链头创建一个新的 mbuf 结构,但后续 segment 的 mbuf 不会被复制——它们通过 next 指针共享。这意味着克隆后的链和原始链共享除链头以外的所有 segment mbuf,引用计数会相应增加。释放时必须使用 rte_pktmbuf_free(而非手动操作 next 指针),确保引用计数正确递减。

四、包解析辅助库#

数据包进入 DPDK 应用后,第一步通常是解析协议头——提取 Ethernet、IP、TCP/UDP 等层的信息。DPDK 提供了一系列辅助库,定义了标准的协议头结构体和解析函数,避免开发者手动计算偏移量。

4.1 rte_ether_hdr:Ethernet 头解析#

lib/net/rte_ether.h
struct rte_ether_addr {
uint8_t addr_bytes[RTE_ETHER_ADDR_LEN]; // 6 字节 MAC 地址
} __rte_aligned(2);
struct rte_ether_hdr {
struct rte_ether_addr dst_addr; // 目标 MAC
struct rte_ether_addr src_addr; // 源 MAC
rte_be16_t ether_type; // 上层协议类型
} __rte_aligned(2);
// ether_type 常见值:
// RTE_ETHER_TYPE_IPV4 (0x0800) — IPv4
// RTE_ETHER_TYPE_IPV6 (0x086DD) — IPv6
// RTE_ETHER_TYPE_ARP (0x0806) — ARP
// RTE_ETHER_TYPE_VLAN (0x8100) — 802.1Q VLAN
// RTE_ETHER_TYPE_QINQ (0x88A8) — 802.1ad QinQ

4.2 rte_ipv4_hdr:IPv4 头解析#

lib/net/rte_ip.h
struct rte_ipv4_hdr {
uint8_t version_ihl; // 版本(4bit) + 头部长度(4bit)
uint8_t type_of_service; // DSCP + ECN
rte_be16_t total_length; // IP 包总长度
rte_be16_t packet_id; // 标识
rte_be16_t fragment_offset; // 分片偏移 + 标志
uint8_t time_to_live; // TTL
uint8_t next_proto_id; // 上层协议号
rte_be16_t hdr_checksum; // 头部校验和
rte_be32_t src_addr; // 源 IP
rte_be32_t dst_addr; // 目标 IP
} __rte_aligned(2);
// 常用辅助函数
uint16_t rte_ipv4_hdr_len(const struct rte_ipv4_hdr *ipv4_hdr);
// 返回 IP 头部长度(字节),根据 IHL 字段计算
int rte_ipv4_frag_pkt_is_fragmented(const struct rte_ipv4_hdr *hdr);
// 判断 IP 包是否被分片

4.3 rte_tcp_hdr / rte_udp_hdr:传输层头解析#

lib/net/rte_tcp.h
struct rte_tcp_hdr {
rte_be16_t src_port; // 源端口
rte_be16_t dst_port; // 目标端口
rte_be32_t sent_seq; // 序列号
rte_be32_t recv_ack; // 确认号
uint8_t data_off; // 数据偏移(4bit)+ 保留(4bit)
uint8_t tcp_flags; // TCP 标志位
rte_be16_t rx_win; // 接收窗口
rte_be16_t cksum; // 校验和
rte_be16_t tcp_urp; // 紧急指针
} __rte_aligned(2);
// TCP 标志位常量
#define RTE_TCP_CWR_FLAG 0x80 // 拥塞窗口减少
#define RTE_TCP_ECE_FLAG 0x40 // ECN-Echo
#define RTE_TCP_URG_FLAG 0x20 // 紧急
#define RTE_TCP_ACK_FLAG 0x10 // 确认
#define RTE_TCP_PSH_FLAG 0x08 // 推送
#define RTE_TCP_RST_FLAG 0x04 // 重置
#define RTE_TCP_SYN_FLAG 0x02 // 同步
#define RTE_TCP_FIN_FLAG 0x01 // 结束
// lib/net/rte_udp.h
struct rte_udp_hdr {
rte_be16_t src_port; // 源端口
rte_be16_t dst_port; // 目标端口
rte_be16_t dgram_len; // 数据报长度
rte_be16_t dgram_cksum; // 校验和
} __rte_aligned(2);

4.4 rte_net:包类型检测#

手动逐层解析协议头既繁琐又容易出错。DPDK 提供了 rte_net 库,通过 rte_net_get_ptype() 自动检测数据包的协议类型:

lib/net/rte_net.h
uint32_t rte_net_get_ptype(struct rte_mbuf *m,
struct rte_net_hdr_lens *pvlens,
uint32_t layers);
// 返回值:位掩码,表示检测到的协议层
// layers 参数:指定要检测的协议层(L2/L3/L4)

包类型返回值是位掩码,常用宏提取各层信息:

// 提取 L2 类型
uint32_t l2_type = rte_net_ptype_get_l2(ptype);
// RTE_PTYPE_L2_ETHER, RTE_PTYPE_L2_ETHER_VLAN, RTE_PTYPE_L2_ETHER_QINQ ...
// 提取 L3 类型
uint32_t l3_type = rte_net_ptype_get_l3(ptype);
// RTE_PTYPE_L3_IPV4, RTE_PTYPE_L3_IPV6, RTE_PTYPE_L3_IPV4_EXT ...
// 提取 L4 类型
uint32_t l4_type = rte_net_ptype_get_l4(ptype);
// RTE_PTYPE_L4_TCP, RTE_PTYPE_L4_UDP, RTE_PTYPE_L4_ICMP ...

4.5 包解析完整示例#

#include <rte_net.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_tcp.h>
#include <rte_udp.h>
// 完整的包解析函数
static inline int
parse_packet(struct rte_mbuf *m, struct rte_net_hdr_lens *hdr_lens)
{
// 方法一:使用 rte_net_get_ptype 自动检测
uint32_t ptype = rte_net_get_ptype(m, hdr_lens,
RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK |
RTE_PTYPE_L4_MASK);
// 方法二:手动逐层解析(更灵活,可处理非标准包)
struct rte_ether_hdr *eth;
struct rte_ipv4_hdr *ipv4;
struct rte_tcp_hdr *tcp;
struct rte_udp_hdr *udp;
// L2: Ethernet
eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
uint16_t ether_type = rte_be_to_cpu_16(eth->ether_type);
// 处理 VLAN 标签
uint32_t offset = sizeof(struct rte_ether_hdr);
if (ether_type == RTE_ETHER_TYPE_VLAN) {
// 跳过 802.1Q VLAN 标签(4 字节)
struct rte_vlan_hdr *vlan =
rte_pktmbuf_mtod_offset(m, struct rte_vlan_hdr *, offset);
ether_type = rte_be_to_cpu_16(vlan->eth_proto);
offset += sizeof(struct rte_vlan_hdr);
}
// L3: IPv4
if (ether_type == RTE_ETHER_TYPE_IPV4) {
ipv4 = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *, offset);
offset += rte_ipv4_hdr_len(ipv4);
uint32_t src_ip = rte_be_to_cpu_32(ipv4->src_addr);
uint32_t dst_ip = rte_be_to_cpu_32(ipv4->dst_addr);
// L4: TCP / UDP
if (ipv4->next_proto_id == IPPROTO_TCP) {
tcp = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, offset);
uint16_t src_port = rte_be_to_cpu_16(tcp->src_port);
uint16_t dst_port = rte_be_to_cpu_16(tcp->dst_port);
printf("TCP: %u -> %u\n", src_port, dst_port);
} else if (ipv4->next_proto_id == IPPROTO_UDP) {
udp = rte_pktmbuf_mtod_offset(m, struct rte_udp_hdr *, offset);
uint16_t src_port = rte_be_to_cpu_16(udp->src_port);
uint16_t dst_port = rte_be_to_cpu_16(udp->dst_port);
printf("UDP: %u -> %u\n", src_port, dst_port);
}
}
return 0;
}
Note

rte_pktmbuf_mtod_offset(m, type, offset) 是 DPDK 中访问 mbuf 数据的常用宏,它返回 m->data_addr + offset 处的指针,并强制转换为指定类型。这个宏只访问链头 mbuf 的数据区域,如果 offset 超出链头 data_len,需要使用 rte_pktmbuf_read() 代替。

五、CRC 与 Hash 硬件卸载#

数据包处理中,校验和计算和哈希查找是两个高频操作。如果全部由 CPU 软件完成,会消耗大量 CPU 周期。DPDK 支持将这些操作卸载到网卡硬件,释放 CPU 用于业务逻辑。

5.1 CRC 硬件卸载#

以太网帧尾部有 4 字节的 CRC(FCS,Frame Check Sequence),用于检测传输错误。传统内核网络栈在收包时由网卡硬件自动剥离 CRC 并校验,DPDK 同样支持这一特性。

RX 侧 CRC 卸载

// 端口配置时启用 CRC 剥离
struct rte_eth_conf port_conf = {
.rxmode = {
.offloads = DEV_RX_OFFLOAD_CRC_STRIP, // 硬件自动剥离 CRC
// 注意:较新 DPDK 版本中 CRC_STRIP 已默认启用
// 旧版本需要显式设置
},
};

TX 侧 CRC 卸载

发送时,网卡硬件自动在帧尾部插入 CRC,应用无需计算。这是几乎所有网卡都支持的基本功能,DPDK 默认启用。

校验和卸载

比 CRC 更重要的是 IP/TCP/UDP 校验和的硬件卸载:

// RX 侧:网卡校验并标记校验和是否正确
struct rte_eth_conf port_conf = {
.rxmode = {
.offloads = DEV_RX_OFFLOAD_IPV4_CKSUM | // IPv4 头部校验和
DEV_RX_OFFLOAD_UDP_CKSUM | // UDP 校验和
DEV_RX_OFFLOAD_TCP_CKSUM, // TCP 校验和
},
};
// 收包后检查校验和状态
struct rte_mbuf *m;
// ... 收包后 ...
if (m->ol_flags & RTE_MBUF_F_RX_IP_CKSUM_BAD) {
// IPv4 校验和错误,丢弃
rte_pktmbuf_free(m);
continue;
}
if (m->ol_flags & RTE_MBUF_F_RX_L4_CKSUM_BAD) {
// TCP/UDP 校验和错误,丢弃
rte_pktmbuf_free(m);
continue;
}
// TX 侧:网卡计算并插入校验和
struct rte_eth_conf port_conf = {
.txmode = {
.offloads = DEV_TX_OFFLOAD_IPV4_CKSUM | // 硬件计算 IPv4 校验和
DEV_TX_OFFLOAD_UDP_CKSUM | // 硬件计算 UDP 校验和
DEV_TX_OFFLOAD_TCP_CKSUM, // 硬件计算 TCP 校验和
},
};
// 发包前设置 mbuf 标志,告诉网卡需要计算哪些校验和
m->ol_flags = RTE_MBUF_F_TX_IPV4 | // 使用 IPv4
RTE_MBUF_F_TX_IP_CKSUM | // 硬件计算 IP 校验和
RTE_MBUF_F_TX_TCP_CKSUM; // 硬件计算 TCP 校验和
m->l2_len = sizeof(struct rte_ether_hdr);
m->l3_len = sizeof(struct rte_ipv4_hdr);
m->l4_len = sizeof(struct rte_tcp_hdr);

5.2 rte_hash:软件哈希库#

当硬件卸载不可用或需要更灵活的哈希功能时,DPDK 提供了 rte_hash 库——基于 Cuckoo Hash 的高性能哈希表。

Cuckoo Hash 的特点:

  • O(1) 最坏情况查找:每个 key 最多映射到两个 bucket,查找最多访问两次
  • 高空间利用率:负载因子可达 90% 以上
  • 无锁读:读操作不需要加锁,适合读多写少场景
  • 批量查找:支持一次查找多个 key,利用缓存局部性
#include <rte_hash.h>
// 创建哈希表
struct rte_hash_parameters hash_params = {
.name = "flow_table",
.entries = 1 << 20, // 最大条目数
.key_len = sizeof(struct flow_key), // key 长度
.hash_func = rte_jhash, // 哈希函数
.hash_func_init_val = 0,
.socket_id = rte_socket_id(),
};
struct rte_hash *flow_table = rte_hash_create(&hash_params);
// 查找
struct flow_key key = { .src_ip = src, .dst_ip = dst, .src_port = sp, .dst_port = dp };
int32_t index = rte_hash_lookup(flow_table, &key);
if (index >= 0) {
// 找到,index 是数据数组中的位置
void *data = rte_hash_get_key_with_position(flow_table, index, NULL);
}
// 插入
int ret = rte_hash_add_key_data(flow_table, &key, flow_data);
// 删除
ret = rte_hash_del_key(flow_table, &key);

5.3 RSS:接收端缩放#

RSS(Receive Side Scaling)是网卡硬件的哈希分流机制。网卡对每个收到的数据包计算一个哈希值,根据哈希值将包分配到不同的 RX 队列。每个 RX 队列绑定到一个 lcore,从而实现多核并行处理。

graph LR subgraph "RSS 哈希分流" NIC["网卡 NIC<br/>计算 RSS Hash"] --> Q0["RX Queue 0<br/>lcore 0"] NIC --> Q1["RX Queue 1<br/>lcore 1"] NIC --> Q2["RX Queue 2<br/>lcore 2"] NIC --> Q3["RX Queue 3<br/>lcore 3"] end subgraph "同一流的包" P1["包 A<br/>hash = 0x3A2F"] --> Q1 P2["包 B<br/>hash = 0x7B1C"] --> Q3 P3["包 C<br/>hash = 0x3A2F"] --> Q1 end style NIC fill:#e3f2fd,stroke:#1565c0 style Q0 fill:#fff3e0,stroke:#e65100 style Q1 fill:#e8f5e9,stroke:#2e7d32 style Q2 fill:#fff3e0,stroke:#e65100 style Q3 fill:#e8f5e9,stroke:#2e7d32

RSS 配置:

// 配置 RSS
struct rte_eth_rss_conf rss_conf = {
.rss_key = rss_key, // RSS 哈希密钥(40 字节)
.rss_key_len = 40,
.rss_hf = RTE_ETH_RSS_IP | // 对 IP 头部(src/dst IP)计算哈希
RTE_ETH_RSS_TCP | // 对 TCP 头部(src/dst port)计算哈希
RTE_ETH_RSS_UDP, // 对 UDP 头部(src/dst port)计算哈希
};
struct rte_eth_conf port_conf = {
.rxmode = {
.mq_mode = RTE_ETH_MQ_RX_RSS, // 启用 RSS 多队列模式
},
.rx_adv_conf = {
.rss_conf = rss_conf,
},
};

RSS 的关键特性:

  • 保序性:同一五元组(src_ip, dst_ip, src_port, dst_port, protocol)的包始终哈希到同一队列,保证流的包序不被打乱
  • 哈希算法:大多数网卡使用 Toeplitz Hash,DPDK 提供了 rte_thash 库用于软件端复现相同的哈希值
  • RSS Key:默认 key 可能导致哈希分布不均匀,建议使用非对称 RSS key

5.4 rte_thash:Toeplitz 哈希#

在某些场景下,DPDK 应用需要在软件端计算与网卡 RSS 相同的哈希值——例如确定某个流会被分配到哪个 RX 队列,或者在控制面预计算分流策略。

#include <rte_thash.h>
// 使用与网卡相同的 RSS key 计算 Toeplitz 哈希
uint8_t rss_key[40];
// 从网卡获取 RSS key
rte_eth_dev_rss_hash_conf_get(port_id, &rss_conf);
memcpy(rss_key, rss_conf.rss_key, 40);
// 计算五元组的 Toeplitz 哈希
union rte_thash_tuple tuple;
tuple.v4.src_addr = rte_cpu_to_be_32(src_ip);
tuple.v4.dst_addr = rte_cpu_to_be_32(dst_ip);
tuple.v4.sport = rte_cpu_to_be_16(src_port);
tuple.v4.dport = rte_cpu_to_be_16(dst_port);
uint32_t hash = rte_softrss((uint32_t *)&tuple,
RTE_THASH_V4_L4_LEN, rss_key);
// 根据 hash 确定目标 RX 队列
uint16_t queue_id = hash % nb_rx_queues;
Warning

RSS 哈希只考虑五元组,不考虑包的大小和内容。这意味着如果网络中有大量相同五元组的流量(例如大量 HTTP 请求到同一个服务器),这些流量会被集中到一个 RX 队列和一个 lcore 上,造成负载不均衡。解决方案包括:使用非对称 RSS key、在应用层做二次分流(如 rte_distributor)、或使用支持更细粒度分流的网卡。

六、TSO 与 LRO#

TCP 协议对段大小有限制(MSS,通常为 1460 字节)。当应用发送大量数据时,TCP 层需要将数据分割为多个不超过 MSS 的段。如果由 CPU 完成分割,每次发送大块数据都需要多次协议头填充和校验和计算。TSO 和 LRO 将这些操作卸载到网卡硬件,大幅减少 CPU 开销。

6.1 TSO(TCP Segmentation Offload)#

TSO 允许应用将一个大的 TCP 数据块(远超 MSS)直接交给网卡,由网卡硬件完成 TCP 分段。应用只需构造一个包含完整数据的大 mbuf,网卡自动将其分割为多个符合 MSS 的 TCP 段并发送。

// 启用 TSO
struct rte_eth_conf port_conf = {
.txmode = {
.offloads = DEV_TX_OFFLOAD_TCP_TSO,
},
};
// 发送大段数据时使用 TSO
void send_large_data(struct rte_mbuf *m, uint16_t port_id)
{
// 设置 mbuf 标志
m->ol_flags = RTE_MBUF_F_TX_IPV4 |
RTE_MBUF_F_TX_IP_CKSUM |
RTE_MBUF_F_TX_TCP_CKSUM |
RTE_MBUF_F_TX_TCP_SEG; // 启用 TSO
// 设置各层头部长度
m->l2_len = sizeof(struct rte_ether_hdr); // 14
m->l3_len = sizeof(struct rte_ipv4_hdr); // 20
m->l4_len = sizeof(struct rte_tcp_hdr); // 20
// 设置 TSO 分段大小(MSS)
m->tso_segsz = 1460; // TCP MSS
// 发送——网卡会自动将大包分割为多个 TCP 段
rte_eth_tx_burst(port_id, 0, &m, 1);
}

TSO 的性能影响极为显著:

指标无 TSO有 TSO改善
TX 包数(64KB 数据)~44 个 TCP 段1 个大 mbuf减少 44 倍
CPU 周期/包~200 cycles~200 cycles总周期减少 44 倍
PCIe DMA 事务44 次1 次大幅减少
缓存压力44 次 mbuf 访问1 次 mbuf 访问大幅降低
Note

TSO 在 Ch05 介绍的向量化发送路径中尤为重要。没有 TSO 时,64KB 的数据需要 44 次 TX 描述符和 DMA 操作;启用 TSO 后,只需 1 次 TX 描述符提交,网卡内部完成分段。这不仅减少了 CPU 开销,还降低了 PCIe 总线压力,对高吞吐场景至关重要。

6.2 LRO(Large Receive Offload)#

LRO 是 TSO 的逆操作:网卡将收到的多个同一 TCP 流的段合并为一个大段,再交给驱动。这减少了应用需要处理的包数量,降低了协议栈处理开销。

// 启用 LRO
struct rte_eth_conf port_conf = {
.rxmode = {
.offloads = DEV_RX_OFFLOAD_TCP_LRO,
.max_lro_pkt_size = 65535, // LRO 合并后的最大包大小
},
};
// 收包后检查是否为 LRO 合并包
struct rte_mbuf *m;
// ... 收包后 ...
if (m->ol_flags & RTE_MBUF_F_RX_LRO_CKSUM) {
// 这是一个 LRO 合并包
// m->pkt_len 可能远大于 MSS
// m->tso_segsz 包含合并前的段数量信息
}

LRO 的注意事项:

  • 保序性:LRO 必须保证合并的段是连续的(序列号递增),不能乱序合并
  • TCP 标志:带有 URG、SYN、FIN、RST 标志的段不能合并
  • 窗口更新:窗口大小变化的段不能合并
  • 超时:如果一段时间内没有收到新的段,网卡应该提交已合并的数据,避免无限等待
Warning

LRO 在转发场景中需要谨慎使用。如果 DPDK 应用是路由器/交换机,直接转发收到的包,那么 LRO 合并后的包可能超过出接口的 MTU,需要重新分片。此外,LRO 会改变包的到达模式——原本是多个小包,现在变成一个大包——这可能影响基于包计数或包间延迟的算法。在转发场景中,通常建议禁用 LRO。

七、Scatter-Gather I/O#

Scatter-Gather I/O(也称为向量 I/O)允许一次 I/O 操作处理多个不连续的内存区域。在 DPDK 中,这体现为多段 mbuf 的发送和接收。

7.1 多段 TX 发送#

当 mbuf 是分段链(nb_segs > 1)时,网卡需要将多个 mbuf 的数据通过一次 DMA 操作发送出去。这需要网卡支持 Scatter-Gather,并且驱动正确配置 TX 描述符。

// 检查网卡是否支持多段发送
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(port_id, &dev_info);
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MULTI_SEGS) {
printf("网卡支持多段发送\n");
}
// 启用多段发送
struct rte_eth_conf port_conf = {
.txmode = {
.offloads = DEV_TX_OFFLOAD_MULTI_SEGS,
},
};

多段发送的典型场景:

IP 分片:当转发路径的出接口 MTU 小于入接口时,需要对 IP 包进行分片。每个分片对应一个 mbuf,它们通过 next 指针链接成链,一次 rte_eth_tx_burst 即可发送整个分片链。

// IP 分片示例(简化)
struct rte_mbuf *fragments[8];
int n_frags = rte_ipv4_fragment_packet(
original_mbuf,
fragments, // 分片后的 mbuf 数组
8, // 最多 8 个分片
mtu, // 出接口 MTU
mbuf_pool, // 用于分配新 mbuf 的 mempool
mbuf_pool
);
// fragments[0] 是第一个分片,可能是一个分段链
// 一次发送所有分片
rte_eth_tx_burst(port_id, queue_id, fragments, n_frags);

隧道封装:在原始数据包前面添加隧道头(如 VXLAN、GRE),如果 headroom 空间不足,需要分配一个新的 mbuf 作为链头,包含隧道头,原始 mbuf 作为后续 segment。

// VXLAN 封装示例(简化)
struct rte_mbuf *outer = rte_pktmbuf_alloc(mbuf_pool);
// 在 outer mbuf 中构造外层头部
// Ethernet + IPv4 + UDP + VXLAN
struct rte_ether_hdr *eth = rte_pktmbuf_mtod(outer, struct rte_ether_hdr *);
// ... 填充外层头部 ...
outer->data_len = outer_hdr_len;
outer->pkt_len = outer_hdr_len + original_mbuf->pkt_len;
// 将原始 mbuf 链接到 outer
outer->next = original_mbuf;
outer->nb_segs = 1 + original_mbuf->nb_segs;
// 发送——网卡通过 Scatter-Gather 一次发送 outer + original
rte_eth_tx_burst(port_id, queue_id, &outer, 1);

7.2 多段 RX 接收#

Ch05 中提到,当网卡收到的数据包超过单个 mbuf 的数据区域大小时,PMD 会将数据分散到多个 mbuf 中,形成分段链。这需要:

  1. 网卡支持 Scatter-Gather 接收
  2. RX 队列配置时启用多段接收
// 启用多段接收
struct rte_eth_conf port_conf = {
.rxmode = {
.offloads = DEV_RX_OFFLOAD_SCATTER,
},
};
// 配置 RX 队列时指定每个描述符可用的 mbuf 数量
struct rte_eth_rxconf rxconf = {
.rx_free_thresh = 32,
};
// 对于支持 Scatter-Gather 的网卡,需要为每个描述符
// 预分配多个 mbuf(通过 rx_seg 配置)

7.3 Scatter-Gather 性能考量#

因素单段 mbuf多段 mbuf(Scatter-Gather)
DMA 事务1 次N 次(N = segment 数量)
描述符消耗1 个N 个
缓存局部性好(数据连续)差(数据分散)
适用场景普通帧(< 2KB)Jumbo Frame、隧道封装、IP 分片
Warning

Scatter-Gather 发送虽然方便,但性能不如单段发送。每个额外的 segment 都需要额外的 TX 描述符和 DMA 操作,增加了 PCIe 总线压力和网卡处理开销。在高吞吐场景中,如果可能,应尽量使用 headroom 空间来添加头部,避免产生多段 mbuf。DPDK 的 rte_pktmbuf_prepend() 可以在 headroom 中添加数据,只要 headroom 空间足够,就不会产生新的 segment。

八、动手实践#

实践 1:创建 Ring 并基准测试 SPSC vs MPMC#

本实践对比 SPSC 和 MPMC 模式的吞吐量差异,直观感受无锁 vs CAS 的性能差距。

ring_benchmark.c
#include <rte_ring.h>
#include <rte_lcore.h>
#include <rte_cycles.h>
#include <rte_mbuf.h>
#include <rte_mempool.h>
#define RING_SIZE 4096
#define BURST_SIZE 32
#define ITERATIONS 10000000
static struct rte_ring *spsc_ring;
static struct rte_ring *mpmc_ring;
static struct rte_mempool *mbuf_pool;
// SPSC 生产者
static int
spsc_producer(__rte_unused void *arg)
{
void *bufs[BURST_SIZE];
uint64_t start = rte_rdtsc();
for (int i = 0; i < ITERATIONS / BURST_SIZE; i++) {
// 模拟入队(使用空指针,只测 ring 性能)
for (int j = 0; j < BURST_SIZE; j++)
bufs[j] = (void *)(uintptr_t)(i * BURST_SIZE + j);
while (rte_ring_sp_enqueue_bulk(spsc_ring, bufs,
BURST_SIZE, NULL) == 0)
; // 自旋等待
}
uint64_t end = rte_rdtsc();
printf("SPSC Producer: %.2f Mops/s\n",
(double)ITERATIONS / ((end - start) / rte_get_timer_hz()) / 1e6);
return 0;
}
// SPSC 消费者
static int
spsc_consumer(__rte_unused void *arg)
{
void *bufs[BURST_SIZE];
uint64_t count = 0;
uint64_t start = rte_rdtsc();
while (count < ITERATIONS) {
uint16_t n = rte_ring_sc_dequeue_bulk(spsc_ring,
(void **)bufs, BURST_SIZE, NULL);
count += n;
}
uint64_t end = rte_rdtsc();
printf("SPSC Consumer: %.2f Mops/s\n",
(double)ITERATIONS / ((end - start) / rte_get_timer_hz()) / 1e6);
return 0;
}
// MPMC 生产者(与 SPSC 类似,使用 rte_ring_mp_enqueue_bulk)
// MPMC 消费者(与 SPSC 类似,使用 rte_ring_mc_dequeue_bulk)
// ... 代码结构相同,替换函数名即可 ...
int main(int argc, char **argv)
{
rte_eal_init(argc, argv);
// 创建 SPSC ring
spsc_ring = rte_ring_create("spsc", RING_SIZE,
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 创建 MPMC ring(无 SP/SC 标志)
mpmc_ring = rte_ring_create("mpmc", RING_SIZE,
rte_socket_id(), 0);
// 分别运行 SPSC 和 MPMC 基准测试
// ... 启动 lcore 执行生产者/消费者 ...
rte_eal_cleanup();
return 0;
}

预期结果:SPSC 吞吐量通常比 MPMC 高 30%~50%,因为 SPSC 不需要 CAS 操作和重试循环。当生产者/消费者数量增加时,MPMC 的性能下降更明显。

实践 2:使用 rte_net 辅助库解析数据包#

本实践演示如何使用 rte_net_get_ptype() 和手动解析两种方式提取数据包的协议信息。

packet_parser.c
#include <rte_net.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_tcp.h>
#include <rte_udp.h>
static void
parse_with_rte_net(struct rte_mbuf *m)
{
struct rte_net_hdr_lens hdr_lens;
uint32_t ptype = rte_net_get_ptype(m, &hdr_lens,
RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK |
RTE_PTYPE_L4_MASK);
printf("Packet type: 0x%08x\n", ptype);
// L2
if (RTE_PTYPE_L2_MASK & ptype)
printf(" L2: %s\n",
(RTE_PTYPE_L2_ETHER & ptype) ? "Ethernet" : "Other");
// L3
if (RTE_PTYPE_L3_IPV4 & ptype)
printf(" L3: IPv4 (hdr_len=%u)\n", hdr_lens.l3_len);
else if (RTE_PTYPE_L3_IPV6 & ptype)
printf(" L3: IPv6 (hdr_len=%u)\n", hdr_lens.l3_len);
// L4
if (RTE_PTYPE_L4_TCP & ptype)
printf(" L4: TCP (hdr_len=%u)\n", hdr_lens.l4_len);
else if (RTE_PTYPE_L4_UDP & ptype)
printf(" L4: UDP (hdr_len=%u)\n", hdr_lens.l4_len);
else if (RTE_PTYPE_L4_ICMP & ptype)
printf(" L4: ICMP\n");
}
static void
parse_manual(struct rte_mbuf *m)
{
struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
uint16_t etype = rte_be_to_cpu_16(eth->ether_type);
printf("Ethernet: " RTE_ETHER_ADDR_PRT_FMT " -> "
RTE_ETHER_ADDR_PRT_FMT "\n",
RTE_ETHER_ADDR_BYTES(&eth->src_addr),
RTE_ETHER_ADDR_BYTES(&eth->dst_addr));
if (etype == RTE_ETHER_TYPE_IPV4) {
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(eth + 1);
printf("IPv4: %u.%u.%u.%u -> %u.%u.%u.%u, proto=%u\n",
IP4_FMT(rte_be_to_cpu_32(ip->src_addr)),
IP4_FMT(rte_be_to_cpu_32(ip->dst_addr)),
ip->next_proto_id);
}
}

实践 3:启用 TSO 并测量 TX 性能提升#

本实践对比启用/禁用 TSO 时的发送性能。

tso_benchmark.c
#include <rte_ethdev.h>
#include <rte_mbuf.h>
// 构造一个大的 TCP 数据包(64KB)
static struct rte_mbuf *
build_large_tcp_packet(struct rte_mempool *pool, uint32_t payload_len)
{
struct rte_mbuf *m = rte_pktmbuf_alloc(pool);
if (m == NULL) return NULL;
// 填充 Ethernet + IP + TCP 头部 + payload
uint32_t total_len = sizeof(struct rte_ether_hdr) +
sizeof(struct rte_ipv4_hdr) +
sizeof(struct rte_tcp_hdr) + payload_len;
// 如果 payload 超过单个 mbuf 的数据区域,需要分段链
// ...(此处简化,假设 mbuf 足够大或使用分段链)
// 填充头部
struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
// ... 填充 Ethernet 头 ...
struct rte_ipv4_hdr *ip = (struct rte_ipv4_hdr *)(eth + 1);
ip->total_length = rte_cpu_to_be_16(total_len - sizeof(struct rte_ether_hdr));
ip->next_proto_id = IPPROTO_TCP;
// ... 填充 IP 头 ...
struct rte_tcp_hdr *tcp = (struct rte_tcp_hdr *)(ip + 1);
// ... 填充 TCP 头 ...
m->pkt_len = total_len;
m->data_len = total_len;
return m;
}
// 测试 TSO 性能
static void
benchmark_tso(uint16_t port_id, struct rte_mempool *pool)
{
struct rte_mbuf *pkts[BURST_SIZE];
uint64_t total_bytes = 0;
uint64_t start, end;
// 构造 64KB 的 TCP 包
for (int i = 0; i < BURST_SIZE; i++) {
pkts[i] = build_large_tcp_packet(pool, 65536);
pkts[i]->ol_flags = RTE_MBUF_F_TX_IPV4 |
RTE_MBUF_F_TX_IP_CKSUM |
RTE_MBUF_F_TX_TCP_CKSUM |
RTE_MBUF_F_TX_TCP_SEG; // TSO
pkts[i]->tso_segsz = 1460; // MSS
pkts[i]->l2_len = sizeof(struct rte_ether_hdr);
pkts[i]->l3_len = sizeof(struct rte_ipv4_hdr);
pkts[i]->l4_len = sizeof(struct rte_tcp_hdr);
}
start = rte_rdtsc();
for (int iter = 0; iter < 1000; iter++) {
uint16_t sent = rte_eth_tx_burst(port_id, 0, pkts, BURST_SIZE);
total_bytes += sent * 65536;
}
end = rte_rdtsc();
double gbps = (double)total_bytes * 8 /
((end - start) / rte_get_timer_hz()) / 1e9;
printf("TSO enabled: %.2f Gbps\n", gbps);
}

预期结果:启用 TSO 后,64KB 数据只需一次 TX 操作(网卡内部拆分为 44 个 TCP 段),相比不启用 TSO 时需要 44 次 TX 操作,吞吐量提升 1050 倍。

实践 4:实现基本 L2 转发与包解析#

本实践综合运用本章所学,实现一个完整的 L2 转发应用:收包→解析→修改 MAC→发包。

// l2fwd_parse.c — 基于 DPDK 的 L2 转发与包解析
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_tcp.h>
#include <rte_net.h>
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define BURST_SIZE 32
#define MAX_PKT_BURST 32
static const struct rte_eth_conf port_conf = {
.rxmode = {
.max_lro_pkt_size = 65535,
.offloads = DEV_RX_OFFLOAD_IPV4_CKSUM |
DEV_RX_OFFLOAD_UDP_CKSUM |
DEV_RX_OFFLOAD_TCP_CKSUM,
},
.txmode = {
.offloads = DEV_TX_OFFLOAD_IPV4_CKSUM |
DEV_TX_OFFLOAD_UDP_CKSUM |
DEV_TX_OFFLOAD_TCP_CKSUM |
DEV_TX_OFFLOAD_MULTI_SEGS,
},
};
// L2 转发:交换源/目标 MAC 地址
static inline void
l2fwd_mac_swap(struct rte_mbuf *m)
{
struct rte_ether_hdr *eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
struct rte_ether_addr tmp;
rte_ether_addr_copy(&eth->dst_addr, &tmp);
rte_ether_addr_copy(&eth->src_addr, &eth->dst_addr);
rte_ether_addr_copy(&tmp, &eth->src_addr);
}
// 包解析与统计
struct port_stats {
uint64_t rx_packets;
uint64_t tx_packets;
uint64_t ipv4_packets;
uint64_t ipv6_packets;
uint64_t tcp_packets;
uint64_t udp_packets;
uint64_t dropped;
};
static struct port_stats stats;
static inline void
parse_and_count(struct rte_mbuf *m)
{
struct rte_net_hdr_lens hdr_lens;
uint32_t ptype = rte_net_get_ptype(m, &hdr_lens,
RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK |
RTE_PTYPE_L4_MASK);
if (ptype & RTE_PTYPE_L3_IPV4)
stats.ipv4_packets++;
else if (ptype & RTE_PTYPE_L3_IPV6)
stats.ipv6_packets++;
if (ptype & RTE_PTYPE_L4_TCP)
stats.tcp_packets++;
else if (ptype & RTE_PTYPE_L4_UDP)
stats.udp_packets++;
}
// 主循环
static void
main_loop(uint16_t rx_port, uint16_t tx_port)
{
struct rte_mbuf *bufs[BURST_SIZE];
while (1) {
// 收包
uint16_t nb_rx = rte_eth_rx_burst(rx_port, 0, bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
continue;
stats.rx_packets += nb_rx;
for (uint16_t i = 0; i < nb_rx; i++) {
// 检查校验和
if (bufs[i]->ol_flags & RTE_MBUF_F_RX_IP_CKSUM_BAD) {
rte_pktmbuf_free(bufs[i]);
stats.dropped++;
bufs[i] = NULL;
continue;
}
// 解析并统计
parse_and_count(bufs[i]);
// L2 转发:交换 MAC 地址
l2fwd_mac_swap(bufs[i]);
}
// 发包
uint16_t nb_tx = rte_eth_tx_burst(tx_port, 0, bufs, nb_rx);
stats.tx_packets += nb_tx;
// 释放未发送的 mbuf
if (unlikely(nb_tx < nb_rx)) {
stats.dropped += (nb_rx - nb_tx);
for (uint16_t i = nb_tx; i < nb_rx; i++)
rte_pktmbuf_free(bufs[i]);
}
}
}

小结#

本章深入剖析了 DPDK 数据平面的核心机制,这些机制共同构成了数据包在 DPDK 应用中高效流动的基础设施:

机制核心作用关键设计
rte_ring无锁数据传递SPSC 无原子操作、MPMC CAS 两阶段提交、RTS/HTS 可扩展优化
rte_mbuf 分段链大包处理nb_segs + next 指针、Jumbo Frame 分散存储
零拷贝避免数据拷贝clone/attach 共享数据区域、引用计数管理
包解析辅助库协议头提取rte_ether/rte_ip/rte_tcp 结构体、rte_net_get_ptype 自动检测
CRC/Hash 硬件卸载释放 CPU网卡计算校验和、RSS 哈希分流、rte_thash 软件复现
TSO/LRO减少 TX/RX 包数网卡 TCP 分段/合并、吞吐量提升 10~50 倍
Scatter-Gather I/O非连续内存 I/O多段 mbuf 一次 DMA、隧道封装和 IP 分片

数据平面设计的核心原则可以概括为三点:

  1. 零拷贝:数据始终在原始 mbuf 中,只有指针在 ring 中流转。clone/attach 是零拷贝的两种实现方式
  2. 无锁:SPSC 完全无锁,MPMC 通过 CAS 实现无锁并发,RTS/HTS 进一步优化可扩展性
  3. 批量处理:ring 的 burst 操作、PMD 的向量化收发包、TSO 的大段发送——都是”攒一批再处理”的思路

这些原则与 Ch04 中的大页内存、mempool 预分配,以及 Ch05 中的轮询模式、向量化收发包一脉相承——它们共同构成了 DPDK”用空间换时间、用确定性换吞吐”的设计哲学。

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

DPDK 数据平面核心机制
https://blog.souloss.com/posts/high-perf-networking/high-perf-networking-dpdk-data-plane-core/
作者
Souloss
发布于
2025-05-02
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时

相关文章 智能推荐
1
VPP 与 FD.io 数据平面
高性能网络 深入 VPP 与 FD.io 数据平面——矢量包处理(Vector Packet Processing)的 I-Cache 友好设计、插件框架、图节点实现与类型、CLI 与 binary API、预取优化与批量处理、VPP + DPDK 输入节点——掌握下一代数据平面的架构与编程。
2
DPDK 架构全景与核心概念
高性能网络 深入 DPDK 整体架构——EAL 抽象层初始化流程、内存管理概览、轮询模式驱动概念、环形缓冲区与 mbuf、lcore 线程模型、Meson 构建系统,以及从零编写第一个 DPDK 应用——helloworld 全代码走读。
3
DPDK 内存管理
高性能网络 深入 DPDK 内存管理——大页(Huge Pages)配置与 TLB 加速、mempool 分级缓存架构、rte_mbuf 结构与分段链、rte_malloc 分配器、NUMA 感知与跨节点惩罚、memzone 与 IOVA 模式——理解 DPDK 内存管理是掌握高性能数据平面的基石。
4
DPDK 多核与并发模型
高性能网络 深入 DPDK 多核与并发模型——lcore 模型与 CPU 亲和性、Run-to-Completion 模型、Pipeline 模型与 rte_ring 跨核通信、原子操作与内存屏障、RCU 机制(rte_rcu_qsbr)、Eventdev 事件驱动框架——掌握多核数据平面编程的完整技术栈。
5
OVS-DPDK 与虚拟交换
高性能网络 深入 OVS-DPDK 与虚拟交换——OVS-DPDK 架构与内核 OVS 对比、dpdkvhostuser 端口类型、vhost-user 协议与共享 virtio 环、virtio 前后端、VM-to-VM 交换路径、流分类(EMC/DPCLS/megaflows)、性能调优——掌握云环境虚拟网络加速的完整技术栈。