某云计算平台的虚拟交换机部署在 16 核服务器上,但实际只有 4 个核在满负荷运转,其余 12 个核几乎空闲。原因很简单:多核扩展性没做好,锁竞争和 cache bouncing 吞掉了大部分性能增益。DPDK 的多核模型正是为解决这类问题而生。
多核是 DPDK 性能的真正放大器。单核轮询可以把收包推到 40~80 Mpps,但一个 32 核的服务器如果只用一个核,等于把 96% 的算力白白浪费。DPDK 的设计从一开始就围绕多核展开——lcore 绑核让每个线程独占一个物理核心,RSS 哈希把流量均匀分散到多个队列,无锁数据结构让核间通信零等待,RCU 机制让读多写少的场景彻底消除锁争用。
前几章分别深入了 DPDK 的内存管理(Ch04)、轮询模式驱动(Ch05)和数据平面核心机制(Ch06)。本章将把视角从单核扩展到多核,系统剖析 DPDK 的并发模型:lcore 模型与 CPU 亲和性、Run-to-Completion 与 Pipeline 两种经典架构、rte_ring 跨核通信、原子操作与内存屏障、RCU 机制、以及 Eventdev 事件驱动框架。理解了这些,你就掌握了多核数据平面编程的完整技术栈。
一、lcore 模型与 CPU 亲和性
1.1 EAL lcore 映射
DPDK 的 lcore(Logical Core)是一个逻辑执行单元,本质上是绑定到特定 CPU 核心的 pthread 线程。EAL(Environment Abstraction Layer)在初始化时根据 -l 或 -c 参数建立 lcore ID 到 CPU ID 的映射关系:
# 方式一:-l 指定 lcore 列表(lcore 0→CPU 0, lcore 1→CPU 2, lcore 2→CPU 4, ...)dpdk-app -l 0,2,4,6 -n 4 -- -p 0x3
# 方式二:-c 指定 CPU 位掩码(bit 0=CPU 0, bit 1=CPU 1, ...)dpdk-app -c 0x55 -n 4 -- -p 0x3EAL 初始化时为每个 lcore 创建一个 pthread,并通过 pthread_setaffinity_np() 将其绑定到对应的 CPU 核心。绑定之后,操作系统调度器不会再将该线程迁移到其他核心,从而保证:
- 缓存局部性:线程始终在同一核心的 L1/L2 缓存上工作,不会因迁移导致缓存失效
- 延迟确定性:不会被其他线程抢占,轮询周期稳定
- NUMA 感知:线程与内存分配在同一 NUMA 节点,避免跨节点访问延迟
// EAL 初始化中的 lcore 绑核逻辑(简化)int rte_eal_init(int argc, char **argv){ // 解析 -l/-c 参数,建立 lcore 映射表 eal_parse_args(argc, argv);
// 为每个 worker lcore 创建线程并绑核 RTE_LCORE_FOREACH_WORKER(lcore_id) { // 设置 CPU 亲和性 CPU_ZERO(&cpuset); CPU_SET(lcore_config[lcore_id].cpu_id, &cpuset);
// 创建线程 pthread_create(&lcore_config[lcore_id].thread_id, NULL, eal_thread_loop, (void *)(uintptr_t)lcore_id);
// 绑定到指定 CPU 核心 pthread_setaffinity_np(lcore_config[lcore_id].thread_id, sizeof(cpuset), &cpuset); } return 0;}1.2 Master lcore 与 Worker lcore
DPDK 将 lcore 分为两类:
| 类型 | lcore ID | 职责 | 特点 |
|---|---|---|---|
| Master lcore | 0 | 初始化、配置、管理 | 主线程,调用 rte_eal_init() 的线程 |
| Worker lcore | 1, 2, 3, … | 数据平面处理 | 由 Master 通过远程启动 API 激活 |
Master lcore 负责 DPDK 的初始化工作(内存、PMD、队列配置等),然后通过 rte_eal_mp_remote_launch() 将工作函数分发到各个 Worker lcore 上执行。Worker lcore 启动后进入无限循环,持续执行被分配的处理函数。
// Master lcore 启动 Worker lcorestatic int lcore_main(void *arg){ uint16_t port_id = (uint16_t)(uintptr_t)arg; struct rte_mbuf *bufs[BURST_SIZE];
printf("lcore %u 处理 port %u\n", rte_lcore_id(), port_id);
while (!force_quit) { // 收包 uint16_t nb_rx = rte_eth_rx_burst(port_id, 0, bufs, BURST_SIZE); if (unlikely(nb_rx == 0)) continue;
// 处理 + 发包 for (uint16_t i = 0; i < nb_rx; i++) { // 数据包处理逻辑... process_packet(bufs[i]); }
rte_eth_tx_burst(port_id, 0, bufs, nb_rx);
// 释放已发送的 mbuf for (uint16_t i = 0; i < nb_rx; i++) rte_pktmbuf_free(bufs[i]); } return 0;}
int main(int argc, char **argv){ rte_eal_init(argc, argv);
// 在所有 Worker lcore 上启动 lcore_main rte_eal_mp_remote_launch(lcore_main, NULL, CALL_MASTER);
// 等待所有 lcore 完成 rte_eal_mp_wait_lcore(); return 0;}1.3 rte_eal_mp_remote_launch 机制
rte_eal_mp_remote_launch() 是 Master lcore 向 Worker lcore 分发任务的核心 API。其内部机制如下:
// rte_eal_mp_remote_launch 的内部实现(简化)int rte_eal_mp_remote_launch(lcore_function_t *f, void *arg, enum rte_rmt_call_master_t call_master){ int lcore_id;
// 遍历所有 Worker lcore,设置待执行函数 RTE_LCORE_FOREACH_WORKER(lcore_id) { lcore_config[lcore_id].f = f; lcore_config[lcore_id].arg = arg; // 唤醒 Worker lcore(通过管道写触发) rte_atomic_store_explicit(&lcore_config[lcore_id].state, WAIT, rte_memory_order_release); }
// 如果需要,Master lcore 也执行该函数 if (call_master == CALL_MASTER) { f(arg); }
return 0;}Worker lcore 的主循环在 eal_thread_loop() 中,它不断检查 lcore_config[state],一旦发现被设置了新的处理函数,就开始执行:
// Worker lcore 的主循环(简化)static __attribute__((noreturn)) void *eal_thread_loop(__attribute__((unused)) void *arg){ unsigned lcore_id = rte_lcore_id();
while (1) { // 等待 Master 分配任务 while (lcore_config[lcore_id].state != WAIT) pthread_cond_wait(...);
// 执行分配的函数 lcore_function_t *f = lcore_config[lcore_id].f; void *f_arg = lcore_config[lcore_id].arg; lcore_config[lcore_id].ret = f(f_arg);
// 标记完成 lcore_config[lcore_id].state = FINISHED; }}1.4 lcore 模型全景
lcore 绑核只是第一步。要真正实现多核线性扩展,还需要确保数据包被均匀地分发到各个 lcore——这就是 RSS(Receive Side Scaling)的工作。在第 5 章中已经介绍了 RSS 的原理,本章将聚焦 lcore 之间如何协作处理这些被分发过来的数据包。
二、Run-to-Completion 模型
2.1 核心思想
Run-to-Completion(RTC,运行至完成)是 DPDK 最简单也最常用的多核模型。其核心思想是:每个 lcore 独立完成数据包的全部处理流程,从收包到发包,不与其他 lcore 共享任何中间状态。
在 RTC 模型中,每个 lcore 绑定一个或多个 RX 队列,收包后直接在当前 lcore 上完成解析、查表、修改、发包的全部工作。不同 lcore 之间没有数据依赖,因此不需要跨核同步,天然实现了无锁并行。
2.2 数据流路径
┌──────────────────────────────────────────┐ │ 网卡 (NIC) │ │ Queue 0 Queue 1 Queue 2 Queue 3 │ └────┬────────┬────────┬────────┬──────────┘ │ │ │ │ ┌─────────▼──┐ ┌──▼────────┐ ┌──────▼───┐ ┌──▼─────────┐ │ lcore 0 │ │ lcore 1 │ │ lcore 2 │ │ lcore 3 │ │ │ │ │ │ │ │ │ │ rx_burst() │ │ rx_burst()│ │rx_burst()│ │ rx_burst() │ │ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │ │ parse() │ │ parse() │ │ parse() │ │ parse() │ │ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │ │ lookup() │ │ lookup() │ │ lookup() │ │ lookup() │ │ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │ │ modify() │ │ modify() │ │ modify() │ │ modify() │ │ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │ │ tx_burst() │ │ tx_burst()│ │tx_burst()│ │ tx_burst() │ └────────────┘ └───────────┘ └──────────┘ └────────────┘2.3 代码实现
#define BURST_SIZE 32
// Run-to-Completion 主循环static int rtc_lcore_main(void *arg){ uint16_t port_id = (uint16_t)(uintptr_t)arg; uint16_t queue_id = rte_lcore_id() - 1; // 每个 lcore 使用自己的队列 struct rte_mbuf *bufs[BURST_SIZE]; struct rte_mbuf *tx_bufs[BURST_SIZE]; uint16_t nb_tx;
printf("RTC: lcore %u 处理 port %u queue %u\n", rte_lcore_id(), port_id, queue_id);
while (!force_quit) { // 1. 收包 uint16_t nb_rx = rte_eth_rx_burst(port_id, queue_id, bufs, BURST_SIZE); if (unlikely(nb_rx == 0)) continue;
uint16_t tx_count = 0;
// 2. 逐包处理:解析 → 查表 → 修改 for (uint16_t i = 0; i < nb_rx; i++) { struct rte_ether_hdr *eth = rte_pktmbuf_mtod(bufs[i], struct rte_ether_hdr *);
// 简单的 L2 转发:交换源/目的 MAC struct rte_ether_addr tmp; rte_ether_addr_copy(ð->src_addr, &tmp); rte_ether_addr_copy(ð->dst_addr, ð->src_addr); rte_ether_addr_copy(&tmp, ð->dst_addr);
tx_bufs[tx_count++] = bufs[i]; }
// 3. 发包 nb_tx = rte_eth_tx_burst(port_id, queue_id, tx_bufs, tx_count);
// 4. 释放未发送的 mbuf if (unlikely(nb_tx < tx_count)) { for (uint16_t i = nb_tx; i < tx_count; i++) rte_pktmbuf_free(tx_bufs[i]); } } return 0;}2.4 RTC 模型的优缺点
| 优势 | 劣势 |
|---|---|
| 无锁设计,无跨核同步开销 | 每个核必须完成全部处理,功能扩展性差 |
| 缓存友好,数据局部性极佳 | 处理步骤多的场景下单核吞吐受限 |
| 编程模型简单,易于调试 | 流表等共享状态需要特殊处理(RCU/读写锁) |
| 线性扩展——核数翻倍吞吐翻倍 | 负载不均时某些核空闲,利用率下降 |
RTC 模型的线性扩展有一个前提:流量必须被 RSS 均匀地分发到各个队列。如果流量特征导致 RSS 哈希倾斜(例如大量流量来自同一源 IP),某些 lcore 会过载而其他 lcore 空闲。此时需要考虑使用 Flow Director 或 rte_flow 做更精细的流量分发。
三、Pipeline 模型
3.1 核心思想
Pipeline(流水线)模型将数据包的处理流程拆分为多个阶段(Stage),每个阶段由不同的 lcore 负责。数据包在阶段之间通过 rte_ring 无锁队列传递。这与 CPU 指令流水线的思想一致——不同阶段并行工作,整体吞吐量取决于最慢的阶段。
3.2 Pipeline 架构图
3.3 代码实现
// Pipeline 模型:三阶段流水线// Stage 1: 收包 lcorestatic int pipeline_rx_stage(void *arg){ uint16_t port_id = (uint16_t)(uintptr_t)arg; struct rte_mbuf *bufs[BURST_SIZE]; struct rte_ring *rx_ring = rte_ring_lookup("rx_ring");
while (!force_quit) { uint16_t nb_rx = rte_eth_rx_burst(port_id, 0, bufs, BURST_SIZE); if (nb_rx == 0) continue;
// 将收到的 mbuf 指针放入 ring,传递给下一阶段 uint16_t enqueued = rte_ring_sp_enqueue_burst( rx_ring, (void *const *)bufs, nb_rx, NULL);
// 释放未入队的 mbuf(ring 满时背压) for (uint16_t i = enqueued; i < nb_rx; i++) rte_pktmbuf_free(bufs[i]); } return 0;}
// Stage 2: 解析 + 查表 lcorestatic int pipeline_process_stage(void *arg){ struct rte_mbuf *bufs[BURST_SIZE]; struct rte_ring *rx_ring = rte_ring_lookup("rx_ring"); struct rte_ring *tx_ring = rte_ring_lookup("tx_ring");
while (!force_quit) { // 从 rx_ring 取出数据包 uint16_t nb_deq = rte_ring_sc_dequeue_burst( rx_ring, (void **)bufs, BURST_SIZE, NULL); if (nb_deq == 0) continue;
// 处理每个数据包 for (uint16_t i = 0; i < nb_deq; i++) { process_packet(bufs[i]); }
// 将处理后的数据包放入 tx_ring uint16_t enqueued = rte_ring_sp_enqueue_burst( tx_ring, (void *const *)bufs, nb_deq, NULL);
for (uint16_t i = enqueued; i < nb_deq; i++) rte_pktmbuf_free(bufs[i]); } return 0;}
// Stage 3: 发包 lcorestatic int pipeline_tx_stage(void *arg){ uint16_t port_id = (uint16_t)(uintptr_t)arg; struct rte_mbuf *bufs[BURST_SIZE]; struct rte_ring *tx_ring = rte_ring_lookup("tx_ring");
while (!force_quit) { uint16_t nb_deq = rte_ring_sc_dequeue_burst( tx_ring, (void **)bufs, BURST_SIZE, NULL); if (nb_deq == 0) continue;
uint16_t nb_tx = rte_eth_tx_burst(port_id, 0, bufs, nb_deq);
for (uint16_t i = nb_tx; i < nb_deq; i++) rte_pktmbuf_free(bufs[i]); } return 0;}3.4 RTC vs Pipeline 对比
| 维度 | Run-to-Completion | Pipeline |
|---|---|---|
| 核间通信 | 无(每核独立完成) | 有(rte_ring 传递) |
| 缓存局部性 | 极佳(数据不离开当前核) | 较差(数据跨核传递) |
| 编程复杂度 | 低 | 中(需管理 ring 和阶段间协议) |
| 功能扩展 | 差(每核重复全部逻辑) | 好(新功能加新阶段) |
| 延迟 | 低(单核直通) | 较高(经过多个 ring) |
| 负载均衡 | 依赖 RSS 均匀性 | 可通过 ring 水位线动态调节 |
| 适用场景 | 简单转发、L2/L3 交换 | 复杂处理、多协议栈、防火墙 |
实际生产中,两种模型经常混合使用。例如 OVS-DPDK 在入向使用 Pipeline(收包→解析→查表→动作),但在查表后对简单转发动作使用 RTC(直接在当前 lcore 发包),对需要复杂处理的动作(如隧道封装)才走后续 pipeline 阶段。
四、rte_ring 跨核通信
4.1 为什么选择 rte_ring
在 Pipeline 模型中,不同 lcore 之间需要传递数据包指针。传统的线程间通信方式(互斥锁、条件变量)在数据平面场景下性能不可接受——一个 pthread_mutex_lock() 可能需要数百纳秒,而数据包处理的目标是每包几十纳秒。
rte_ring 是 DPDK 提供的无锁环形缓冲区,专门用于高吞吐的跨核数据传递。在第 6 章中已经深入了 rte_ring 的内部实现,这里聚焦它在跨核通信场景中的使用模式。
4.2 SPSC Ring:Pipeline 的最佳选择
在 Pipeline 模型中,两个相邻阶段之间通常是一对一的关系——一个 lcore 入队、一个 lcore 出队。这是典型的 SPSC(Single Producer Single Consumer)场景,也是 rte_ring 性能最高的模式。
// 创建 SPSC ringstruct rte_ring *ring;ring = rte_ring_create("pipeline_ring", 1024, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 生产者 lcore:批量入队static int producer_lcore(void *arg){ struct rte_ring *ring = rte_ring_lookup("pipeline_ring"); struct rte_mbuf *bufs[BURST_SIZE];
while (!force_quit) { uint16_t nb_rx = rte_eth_rx_burst(port_id, queue_id, bufs, BURST_SIZE); if (nb_rx == 0) continue;
// SP 批量入队:无需 CAS,无需重试 uint16_t enqueued = rte_ring_sp_enqueue_burst( ring, (void *const *)bufs, nb_rx, NULL);
// 处理未入队的包(背压) for (uint16_t i = enqueued; i < nb_rx; i++) rte_pktmbuf_free(bufs[i]); } return 0;}
// 消费者 lcore:批量出队static int consumer_lcore(void *arg){ struct rte_ring *ring = rte_ring_lookup("pipeline_ring"); struct rte_mbuf *bufs[BURST_SIZE];
while (!force_quit) { // SC 批量出队:无需 CAS,无需重试 uint16_t nb_deq = rte_ring_sc_dequeue_burst( ring, (void **)bufs, BURST_SIZE, NULL); if (nb_deq == 0) continue;
// 处理数据包 for (uint16_t i = 0; i < nb_deq; i++) process_packet(bufs[i]); } return 0;}4.3 批量操作与背压
rte_ring 的批量操作(burst)是性能的关键。批量入队/出队将多次指针拷贝合并为一次连续内存写入/读取,充分利用 CPU 缓存行的预取能力:
// 批量入队的内部优化(简化)static __rte_always_inline unsigned intrte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space){ uint32_t prod_head = r->prod.head; uint32_t cons_tail = r->cons.tail; uint32_t free_count = r->size + cons_tail - prod_head;
// 实际入队数量 = min(请求数量, 可用空间) uint32_t actual = (n > free_count) ? free_count : n; if (actual == 0) { if (free_space) *free_space = 0; return 0; }
// 批量拷贝指针——连续内存写入,缓存行友好 const uint32_t mask = r->mask; uint32_t idx = prod_head & mask;
// 处理环形回绕:可能需要分两段拷贝 if (idx + actual <= r->size) { // 无回绕:一次 memcpy memcpy(&r->ring[idx], obj_table, actual * sizeof(void *)); } else { // 有回绕:分两段拷贝 uint32_t first_part = r->size - idx; memcpy(&r->ring[idx], obj_table, first_part * sizeof(void *)); memcpy(&r->ring[0], &obj_table[first_part], (actual - first_part) * sizeof(void *)); }
// 内存屏障 + 更新指针 rte_smp_wmb(); r->prod.head = prod_head + actual; r->prod.tail = prod_head + actual;
if (free_space) *free_space = free_count - actual; return actual;}4.4 Cache-line 对齐
rte_ring 的性能优化中,cache-line 对齐是至关重要的一环。生产者的 head/tail 和消费者的 head/tail 被放在不同的 cache-line 上,避免 false sharing:
// rte_ring 结构体中的 cache-line 对齐(DPDK 源码简化)struct rte_ring { // ... 其他字段 ...
alignas(RTE_CACHE_LINE_SIZE) struct rte_ring_headtail prod; // prod 独占一个 cache-line(64 字节)
alignas(RTE_CACHE_LINE_SIZE) struct rte_ring_headtail cons; // cons 独占一个 cache-line(64 字节)};如果 prod 和 cons 在同一个 cache-line 上,生产者更新 prod.head 时会使消费者核心上的 cache-line 失效,反之亦然。这就是 false sharing——虽然两者访问的是不同变量,但因为它们在同一 cache-line 上,导致不必要的缓存弹跳。通过 cache-line 对齐,生产者和消费者各自操作独立的 cache-line,互不干扰。
在 NUMA 系统中,rte_ring 必须创建在正确的 NUMA 节点上。如果 ring 分配在 NUMA 节点 0,但生产者在节点 1 的 CPU 上运行,每次入队操作都需要跨节点访问内存,延迟从 ~50ns 增加到 ~150ns。始终使用 rte_socket_id() 获取当前 lcore 所在的 NUMA 节点,并在该节点上创建 ring。
五、原子操作与内存屏障
5.1 为什么需要原子操作
在多核环境中,即使采用了 RTC 模型减少核间共享,某些场景仍然需要跨核访问共享数据——例如统计计数器、流表更新、全局配置。这些场景下,简单的读写操作不是原子的:
// 非原子的计数器更新——多核下会丢失更新uint64_t packet_count; // 全局计数器
// lcore 0 和 lcore 1 同时执行:packet_count++; // 编译为:load → add → store,不是原子操作// 可能的结果:两个核都 load 了相同的值,各自 +1 后 store,丢失一次更新5.2 rte_atomic 操作
DPDK 提供了 rte_atomic 系列操作,封装了编译器屏障和 CPU 原子指令:
#include <rte_atomic.h>
// 原子计数器static rte_atomic64_t total_packets;
// 原子递增rte_atomic64_inc(&total_packets);
// 原子读取uint64_t count = rte_atomic64_read(&total_packets);
// 原子比较并交换(CAS)rte_atomic64_t version;int64_t expected = 0;int success = rte_atomic64_cmpset( (volatile uint64_t *)&version, // 目标地址 (uint64_t)expected, // 期望值 (uint64_t)1 // 新值);// 如果 version == expected,则 version = 1,返回 1// 否则不做任何操作,返回 05.3 内存屏障
内存屏障(Memory Barrier/Fence)是多核编程中最微妙也最重要的概念。现代 CPU 和编译器都会对指令进行重排序,单线程下这不会影响正确性,但多线程下可能导致另一个核心看到不一致的内存状态。
DPDK 提供了不同粒度的内存屏障:
// 编译器屏障:阻止编译器重排序,但不影响 CPUrte_compiler_barrier();
// SMP 内存屏障:在所有核心之间建立全局顺序rte_smp_mb(); // 全屏障(读+写)rte_smp_rmb(); // 读屏障rte_smp_wmb(); // 写屏障
// I/O 内存屏障:用于设备寄存器访问rte_io_mb();rte_io_rmb();rte_io_wmb();屏障使用场景示例——rte_ring 的两阶段提交:
// rte_ring MPMC enqueue 中的屏障使用static __rte_always_inline unsigned int__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries){ do { // 1. 读取当前 prod.head *old_head = r->prod.head;
// 2. 编译器屏障:确保 old_head 的读取不会被重排到后面 rte_smp_rmb();
// 3. 读取 cons.tail(消费者已确认的出队位置) *free_entries = r->size + r->cons.tail - *old_head;
if (n > *free_entries) return 0;
*new_head = *old_head + n;
// 4. CAS 更新 prod.head(包含完整的内存屏障) } while (rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head) == 0);
return n;}5.4 C11 Atomics:现代 DPDK 的选择
从 DPDK 21.11 开始,官方推荐使用 C11 标准的原子操作替代 rte_atomic。C11 原子提供了更清晰的内存序语义:
#include <stdatomic.h>
// C11 原子变量static atomic_uint_fast64_t total_packets;
// 原子递增(relaxed 序:无屏障,最快)atomic_fetch_add_explicit(&total_packets, 1, memory_order_relaxed);
// 原子读取(relaxed 序:仅保证原子性,不保证顺序)uint64_t count = atomic_load_explicit(&total_packets, memory_order_relaxed);
// 原子 CAS(acquire-release 序:保证 CAS 前后的读写顺序)uint64_t expected = 0;bool success = atomic_compare_exchange_strong_explicit( &version, &expected, 1, memory_order_acq_rel, // 成功时的内存序 memory_order_acquire // 失败时的内存序);5.5 无锁模式:单生产者计数器
在数据平面中,最常见的统计场景是”每个 lcore 独立计数,偶尔汇总”。这种场景下,不需要原子操作——每个 lcore 维护一个 per-lcore 计数器,汇总时只需读取所有 lcore 的计数器求和:
// Per-lcore 计数器——完全无锁static RTE_DEFINE_PER_LCORE(uint64_t, local_packet_count);
// 每个 lcore 独立更新自己的计数器(无原子操作,无屏障)local_packet_count++;
// 汇总:Master lcore 读取所有 Worker 的计数器uint64_t get_total_packets(void){ uint64_t total = 0; unsigned int lcore_id; RTE_LCORE_FOREACH_WORKER(lcore_id) { total += RTE_PER_LCORE_BY_ID(lcore_id, local_packet_count); } total += local_packet_count; // 加上 Master 的 return total;}Per-lcore 计数器是 DPDK 中最推荐的统计方式。rte_eth_xstats_get() 返回的统计信息就是 per-queue 的,每个 lcore 只更新自己队列的计数器,零同步开销。只有在需要全局实时一致视图时(如 QoS 限速),才需要原子操作。
六、RCU 机制:rte_rcu_qsbr
6.1 读多写少的困境
在数据平面中,流表(Flow Table)、路由表(FIB)、ACL 规则等数据结构是典型的”读多写少”场景——每个数据包都要查表,但表项的增删改相对稀少。如果用读写锁保护这些数据结构,读操作虽然可以并发,但写操作会阻塞所有读者,导致数据平面抖动。
RCU(Read-Copy-Update)是一种优雅的解决方案:读者无锁访问,写者创建副本修改,等所有读者离开临界区后回收旧数据。Linux 内核广泛使用 RCU 保护各种数据结构,DPDK 也提供了用户态 RCU 实现——rte_rcu_qsbr。
6.2 静默状态与宽限期
RCU 的核心概念有两个:
- 静默状态(Quiescent State, QS):读者声明”我当前不在访问 RCU 保护的数据”。在 DPDK 中,每个 lcore 在一轮处理循环的间隙(两次
rx_burst()之间)报告静默状态 - 宽限期(Grace Period):从写者发布新版本数据开始,到所有读者都至少报告了一次静默状态的时间段。宽限期结束后,旧数据可以安全回收
6.3 rte_rcu_qsbr API 与代码示例
#include <rte_rcu_qsbr.h>
#define MAX_LCORES 64
// RCU 变量struct rte_rcu_qsbr *qsv;
// 初始化 RCUint init_rcu(void){ size_t sz = rte_rcu_qsbr_get_memsize(MAX_LCORES); qsv = rte_zmalloc("rcu", sz, RTE_CACHE_LINE_SIZE); if (qsv == NULL) return -ENOMEM;
return rte_rcu_qsbr_init(qsv, MAX_LCORES);}
// 读者 lcore:注册 + 报告静默状态static int reader_lcore(void *arg){ unsigned int lcore_id = rte_lcore_id();
// 注册当前 lcore 为 RCU 读者 rte_rcu_qsbr_thread_register(qsv, lcore_id); rte_rcu_qsbr_thread_online(qsv, lcore_id);
while (!force_quit) { // 读取 RCU 保护的数据(无锁) struct flow_entry *flow = lookup_flow_table(packet); if (flow) { // 使用 flow 数据——在此期间,写者不会回收 flow process_flow(flow, packet); }
// 报告静默状态:声明"我已离开临界区" rte_rcu_qsbr_quiescent(qsv, lcore_id); }
rte_rcu_qsbr_thread_offline(qsv, lcore_id); rte_rcu_qsbr_thread_unregister(qsv, lcore_id); return 0;}
// 写者线程:更新流表void update_flow_entry(struct flow_entry *old_flow, struct flow_entry *new_flow){ // 1. 创建新版本(copy + modify) // new_flow 已经在外部创建并初始化
// 2. 原子替换指针 __atomic_store_n(&flow_table[old_flow->hash], new_flow, __ATOMIC_RELEASE);
// 3. 等待宽限期:所有读者都已离开旧数据的临界区 rte_rcu_qsbr_synchronize(qsv, RTE_QSBR_THRID_INVALID, 0);
// 4. 安全回收旧数据 rte_free(old_flow);}6.4 宽限期检查的优化
rte_rcu_qsbr_synchronize() 可以阻塞等待宽限期结束,但在数据平面中,阻塞写者线程是不可接受的。DPDK 提供了异步回调机制:
// 异步 RCU 回调:宽限期结束后自动回收旧数据struct rte_rcu_qsbr_dq *dq;
// 创建延迟回收队列struct rte_rcu_qsbr_dq_parameters params = { .name = "flow_rcu_dq", .size = 1024, .esize = sizeof(struct flow_entry *), .qsv = qsv, .trigger_reclaim_limit = 128, // 队列中超过 128 个待回收项时触发 .reclaim_max = 64, // 每次最多回收 64 个};dq = rte_rcu_qsbr_dq_create(¶ms);
// 写者:将旧数据放入延迟回收队列(不阻塞)void update_flow_async(struct flow_entry *old_flow, struct flow_entry *new_flow){ // 原子替换指针 __atomic_store_n(&flow_table[old_flow->hash], new_flow, __ATOMIC_RELEASE);
// 将旧数据入队,由 RCU 在宽限期后自动回收 rte_rcu_qsbr_dq_enqueue(dq, &old_flow, 1);}
// 定期调用(可在任意 lcore 的主循环中)void reclaim_rcu_resources(void){ rte_rcu_qsbr_dq_reclaim(dq, UINT_MAX, NULL, NULL, NULL);}RCU 不能保护写者之间的并发——同一时刻只能有一个写者修改 RCU 保护的数据结构。如果多个线程需要并发写入,必须额外使用互斥锁或自旋锁保护写路径。RCU 解决的是”读者无锁 + 写者不阻塞读者”的问题,而不是”写者并发”的问题。
七、Eventdev:事件驱动框架
7.1 从轮询到事件驱动
前面讨论的 RTC 和 Pipeline 模型都有一个共同特点:lcore 与队列的绑定关系是静态的。在 RTC 模型中,lcore 0 处理 Queue 0,lcore 1 处理 Queue 1;在 Pipeline 模型中,lcore 0 收包,lcore 1 查表,lcore 2 发包。这种静态绑定在负载均衡上存在天然缺陷——如果 Queue 0 的流量远高于 Queue 1,lcore 0 过载而 lcore 1 空闲。
Eventdev(Event Device)是 DPDK 提供的事件驱动框架,它将”数据包到达”抽象为”事件”,由调度器(Scheduler)动态地将事件分发到空闲的 Worker lcore 上。Worker 不绑定特定队列,而是从调度器获取下一个待处理的事件——哪个 lcore 空闲,事件就分给谁。
7.2 Eventdev 核心概念
Eventdev 涉及以下核心概念:
| 概念 | 说明 |
|---|---|
| Event | 事件载体,可以是数据包(RTE_EVENT_TYPE_ETHDEV)、定时器(RTE_EVENT_TYPE_TIMER)或自定义事件 |
| Queue | 事件队列,事件在 Worker 处理前暂存于此 |
| Port | 事件端口,Worker 通过 Port 从 Queue 获取事件或向 Queue 提交事件 |
| Scheduler | 调度器,负责将事件从 Queue 分发到 Port |
| PMD | Eventdev 驱动,可以是软件实现(event_sw)或硬件实现(event_dlb2) |
7.3 调度模式
Eventdev 支持三种调度模式,决定了事件在不同 Worker 之间的分发策略:
Atomic(原子调度):同一 flow 的事件始终被分发到同一个 Worker,保证同一 flow 的事件按序处理。这是最常用的模式,适用于需要流级一致性的场景(如流表查找、状态防火墙)。
Ordered(有序调度):同一 flow 的事件可以被分发到不同 Worker 并行处理,但在最终输出时恢复原始顺序。适用于计算密集型处理(如深度包检测、加密),需要并行加速但要求输出有序。
Parallel(并行调度):事件可以被分发到任意 Worker,不保证顺序。适用于无状态处理(如统计、日志),每个事件独立处理,不需要流级一致性。
7.4 硬件与软件 PMD
Eventdev 的调度器可以由硬件或软件实现:
| PMD | 类型 | 特点 |
|---|---|---|
| event_dlb2 | 硬件(Intel DSA/DLB2) | 调度在硬件中完成,延迟极低,吞吐极高 |
| event_cn10k | 硬件(Marvell CN10K) | 集成 SSO 调度器,与以太网设备紧密耦合 |
| event_sw | 软件 | 纯软件实现,可在任何平台上运行,性能较低 |
| event_dsw | 软件(分布式) | 分布式软件调度器,避免中心化瓶颈 |
硬件 Eventdev PMD 将调度逻辑卸载到专用硬件,可以显著降低调度延迟。但软件 PMD 提供了更好的灵活性和可移植性,适合开发调试阶段使用。
7.5 Eventdev 代码示例
#include <rte_eventdev.h>
#define NUM_EVENTS 32
// Eventdev 初始化int setup_eventdev(uint8_t dev_id, uint16_t nb_ports, uint16_t nb_queues){ struct rte_event_dev_config dev_conf = { .nb_events_limit = 4096, .nb_event_queues = nb_queues, .nb_event_ports = nb_ports, .nb_event_queue_flows = 1024, .nb_event_port_dequeue_depth = 64, .nb_event_port_enqueue_depth = 64, };
int ret = rte_event_dev_configure(dev_id, &dev_conf); if (ret < 0) return ret;
// 配置事件队列 struct rte_event_queue_conf queue_conf = { .schedule_type = RTE_SCHED_TYPE_ATOMIC, .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, .nb_atomic_flows = 1024, .nb_atomic_order_sequences = 1024, };
for (uint16_t i = 0; i < nb_queues; i++) { rte_event_queue_setup(dev_id, i, &queue_conf); }
// 配置事件端口 struct rte_event_port_conf port_conf = { .new_event_threshold = 1024, .dequeue_depth = 64, .enqueue_depth = 64, };
for (uint16_t i = 0; i < nb_ports; i++) { rte_event_port_setup(dev_id, i, &port_conf); // 将所有队列链接到该端口 uint8_t queue_ids[nb_queues]; for (uint16_t q = 0; q < nb_queues; q++) queue_ids[q] = q; rte_event_port_link(dev_id, i, queue_ids, NULL, nb_queues); }
return rte_event_dev_start(dev_id);}
// RX Adapter:将网卡收包转换为事件int setup_rx_adapter(uint8_t eventdev_id, uint16_t eth_port_id){ struct rte_event_eth_rx_adapter_conf conf = { .event_buf_size = 1024, };
uint8_t rx_adapter_id; rte_event_eth_rx_adapter_create_ext(&rx_adapter_id, eventdev_id, rx_adapter_conf_cb, &conf);
struct rte_event_eth_rx_adapter_queue_conf queue_conf = { .rx_queue_flags = 0, .ev = { .queue_id = 0, // 事件进入 Queue 0 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, .sched_type = RTE_SCHED_TYPE_ATOMIC, }, .servicing_weight = 1, // 轮询模式 };
rte_event_eth_rx_adapter_queue_add(rx_adapter_id, eth_port_id, -1, &queue_conf); rte_event_eth_rx_adapter_start(rx_adapter_id); return 0;}
// Worker lcore:事件驱动处理static int eventdev_worker(void *arg){ uint8_t dev_id = (uint8_t)(uintptr_t)arg; uint8_t port_id = rte_lcore_id(); // 每个 lcore 使用自己的 port struct rte_event events[NUM_EVENTS];
while (!force_quit) { // 从调度器获取事件 uint16_t nb_deq = rte_event_dequeue_burst(dev_id, port_id, events, NUM_EVENTS, 0); if (nb_deq == 0) continue;
for (uint16_t i = 0; i < nb_deq; i++) { struct rte_event *ev = &events[i]; struct rte_mbuf *mbuf = ev->mbuf;
// 处理数据包 process_packet(mbuf);
// 如果需要继续下一阶段处理,重新入队 if (needs_further_processing(mbuf)) { ev->queue_id = 1; // 发送到下一个队列 rte_event_enqueue_burst(dev_id, port_id, ev, 1); } else { // 处理完成,释放 mbuf rte_pktmbuf_free(mbuf); } } } return 0;}7.6 Eventdev vs RTC/Pipeline
| 维度 | RTC | Pipeline | Eventdev |
|---|---|---|---|
| 负载均衡 | 依赖 RSS,可能不均 | 静态分配,可能不均 | 动态调度,自动均衡 |
| 核间通信 | 无 | rte_ring | Eventdev 内部调度 |
| 编程模型 | 最简单 | 中等 | 较复杂(需理解调度模式) |
| 延迟 | 最低 | 中等 | 调度器引入额外延迟 |
| 吞吐 | 单核高,多核依赖 RSS | 受最慢阶段限制 | 动态均衡,整体吞吐高 |
| 硬件加速 | 无 | 无 | 硬件调度器(DLB2/CN10K) |
| 适用场景 | 简单转发 | 多阶段处理 | 负载波动大、需动态均衡 |
Eventdev 不是 RTC/Pipeline 的替代品,而是补充。在负载稳定、RSS 分流均匀的场景下,RTC 的性能仍然最优——因为它完全没有调度开销。Eventdev 的优势在于负载波动大、流量模式复杂的场景,如电信级 vEPC/UPF,不同用户的流量特征差异巨大,静态绑定无法实现有效均衡。
八、动手实践
实践 1:多核 RTC 转发器
构建一个 4 核 RTC 转发器,每个 lcore 处理一个 RX 队列,实现 L2 转发。观察 RSS 哈希是否将流量均匀分发到各个队列:
# 启动 testpmd,4 个 lcore,4 个队列dpdk-testpmd -l 0-4 -n 4 -- -i --rxq=4 --txq=4 --rss-ip
# testpmd 中查看各队列收包统计show port xstats all
# 使用 pktgen 发送不同源 IP 的流量,观察 RSS 分发# 预期:4 个队列的收包数大致相等验证要点:
- 各队列收包数是否均匀(偏差 < 10%)
- 单核吞吐量是否与核数线性增长
perf top观察各 lcore 的 CPU 利用率是否接近 100%
实践 2:Pipeline 模型与性能对比
将实践 1 的 RTC 转发器改造为 3 阶段 Pipeline(收包→处理→发包),使用 rte_ring 传递数据包。对比两种模型的吞吐量和延迟:
// 创建跨核 ringstruct rte_ring *rx_ring = rte_ring_create("rx_ring", 4096, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);struct rte_ring *tx_ring = rte_ring_create("tx_ring", 4096, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 启动 3 个 lcore 分别执行收包、处理、发包rte_eal_remote_launch(rx_stage, (void *)(uintptr_t)port_id, 1);rte_eal_remote_launch(process_stage, NULL, 2);rte_eal_remote_launch(tx_stage, (void *)(uintptr_t)port_id, 3);验证要点:
- Pipeline 模型的吞吐量是否低于 RTC(因 ring 传递开销)
- Pipeline 模型的延迟是否高于 RTC(经过多个 ring)
- 调整 ring 大小(256/1024/4096)对性能的影响
实践 3:RCU 流表更新
使用 rte_rcu_qsbr 实现一个可动态更新的流表。Reader lcore 查表转发,Writer 线程定期添加/删除流表项:
// 流表结构(RCU 保护)struct flow_table_entry { uint32_t src_ip; uint32_t dst_ip; uint16_t src_port; uint16_t dst_port; uint8_t proto; uint16_t output_port;} __rte_cache_aligned;
// 查表(读者,无锁)static inline struct flow_table_entry *flow_lookup(struct flow_table *tbl, struct rte_mbuf *pkt){ // 计算哈希 uint32_t hash = flow_hash(pkt); // 读取指针(原子 load,RCU 保护) struct flow_table_entry *entry = __atomic_load_n(&tbl->entries[hash], __ATOMIC_ACQUIRE); return entry;}
// 更新流表(写者,RCU 保护)void flow_table_update(struct flow_table *tbl, uint32_t hash, struct flow_table_entry *new_entry){ struct flow_table_entry *old_entry = __atomic_exchange_n(&tbl->entries[hash], new_entry, __ATOMIC_ACQ_REL);
if (old_entry) { // 等待宽限期后回收旧表项 rte_rcu_qsbr_synchronize(qsv, RTE_QSBR_THRID_INVALID, 0); rte_free(old_entry); }}验证要点:
- 流表更新期间,Reader lcore 是否出现延迟抖动
- 使用
rte_rcu_qsbr_dq异步回收 vs 同步synchronize的性能差异 - 宽限期的典型时长(与 lcore 数量和处理循环频率相关)
实践 4:Eventdev 原子调度
使用 Eventdev 的软件 PMD(event_dsw)构建一个事件驱动的转发器。对比 Atomic 和 Parallel 两种调度模式下的吞吐量和延迟:
# 使用 event_dsw PMD# 在应用启动时指定 --vdev=event_dsw0dpdk-app --vdev=event_dsw0 -l 0-4 -n 4 -- -p 0x3验证要点:
- Atomic 模式下,同一 flow 的事件是否始终由同一 Worker 处理
- Parallel 模式下,吞吐量是否高于 Atomic(无流级串行化)
- 软件调度器的 CPU 开销占比(
perf top观察调度器函数)
小结
本章系统剖析了 DPDK 的多核与并发模型,这是从单核性能到多核扩展的关键跨越。核心要点如下:
-
lcore 模型:DPDK 通过 EAL 将 pthread 绑定到 CPU 核心,Master lcore 负责初始化和管理,Worker lcore 执行数据平面处理。
rte_eal_mp_remote_launch()是任务分发的核心 API。 -
Run-to-Completion:每个 lcore 独立完成收包→处理→发包的全流程,无锁无同步,缓存局部性极佳,是简单转发场景的最优选择。线性扩展的前提是 RSS 均匀分流。
-
Pipeline:将处理流程拆分为多个阶段,通过
rte_ring传递数据包。功能扩展性好,适合复杂处理场景,但 ring 传递引入额外延迟和缓存失效。 -
rte_ring 跨核通信:SPSC 模式是 Pipeline 的最佳选择,批量操作和 cache-line 对齐是性能关键。NUMA 感知是必须的——跨节点 ring 访问延迟增加 3 倍。
-
原子操作与内存屏障:
rte_atomic和 C11 atomics 提供原子操作,内存屏障保证多核可见性。Per-lcore 计数器是最推荐的统计方式,完全无锁。 -
RCU 机制:
rte_rcu_qsbr实现用户态 RCU,读者无锁访问,写者等宽限期后回收旧数据。异步回调(rte_rcu_qsbr_dq)避免写者阻塞,是生产环境的首选。 -
Eventdev:事件驱动框架通过动态调度实现负载均衡,支持 Atomic/Ordered/Parallel 三种调度模式。硬件 PMD(DLB2/CN10K)提供极低调度延迟,适合电信级场景。
选择哪种模型,取决于你的场景:
- 简单转发、负载均匀 → RTC
- 多阶段处理、功能复杂 → Pipeline
- 负载波动大、需动态均衡 → Eventdev
- 读多写少的共享数据 → RCU
参考资料
- DPDK Programmer’s Guide — Lcore — lcore 模型与 CPU 亲和性的官方文档
- DPDK Programmer’s Guide — Ring Library — rte_ring 无锁环形缓冲区的设计与 API
- DPDK Programmer’s Guide — RCU Library — rte_rcu_qsbr 的原理与使用指南
- DPDK Programmer’s Guide — Eventdev — Eventdev 事件驱动框架的完整文档
- Is Parallel Programming Hard, and if so, what can you do about it? — Paul E. McKenney 的 RCU 权威教材,深入理解 RCU 的理论基础
- Memory Barriers: a Hardware View for Software Hackers — Paul E. McKenney 关于内存屏障的经典论文,理解 CPU 内存模型的基础
- C11 Atomic Operations in DPDK — DPDK 官方关于 C11 原子操作使用的最佳实践
- Intel DLB2 Eventdev PMD — Intel DLB2 硬件 Eventdev 驱动文档
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






