mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
5336 字
15 分钟
DPDK 多核与并发模型
2025-05-17

某云计算平台的虚拟交换机部署在 16 核服务器上,但实际只有 4 个核在满负荷运转,其余 12 个核几乎空闲。原因很简单:多核扩展性没做好,锁竞争和 cache bouncing 吞掉了大部分性能增益。DPDK 的多核模型正是为解决这类问题而生。

多核是 DPDK 性能的真正放大器。单核轮询可以把收包推到 40~80 Mpps,但一个 32 核的服务器如果只用一个核,等于把 96% 的算力白白浪费。DPDK 的设计从一开始就围绕多核展开——lcore 绑核让每个线程独占一个物理核心,RSS 哈希把流量均匀分散到多个队列,无锁数据结构让核间通信零等待,RCU 机制让读多写少的场景彻底消除锁争用。

前几章分别深入了 DPDK 的内存管理(Ch04)、轮询模式驱动(Ch05)和数据平面核心机制(Ch06)。本章将把视角从单核扩展到多核,系统剖析 DPDK 的并发模型:lcore 模型与 CPU 亲和性、Run-to-Completion 与 Pipeline 两种经典架构、rte_ring 跨核通信、原子操作与内存屏障、RCU 机制、以及 Eventdev 事件驱动框架。理解了这些,你就掌握了多核数据平面编程的完整技术栈。

一、lcore 模型与 CPU 亲和性#

1.1 EAL lcore 映射#

DPDK 的 lcore(Logical Core)是一个逻辑执行单元,本质上是绑定到特定 CPU 核心的 pthread 线程。EAL(Environment Abstraction Layer)在初始化时根据 -l-c 参数建立 lcore ID 到 CPU ID 的映射关系:

# 方式一:-l 指定 lcore 列表(lcore 0→CPU 0, lcore 1→CPU 2, lcore 2→CPU 4, ...)
dpdk-app -l 0,2,4,6 -n 4 -- -p 0x3
# 方式二:-c 指定 CPU 位掩码(bit 0=CPU 0, bit 1=CPU 1, ...)
dpdk-app -c 0x55 -n 4 -- -p 0x3

EAL 初始化时为每个 lcore 创建一个 pthread,并通过 pthread_setaffinity_np() 将其绑定到对应的 CPU 核心。绑定之后,操作系统调度器不会再将该线程迁移到其他核心,从而保证:

  • 缓存局部性:线程始终在同一核心的 L1/L2 缓存上工作,不会因迁移导致缓存失效
  • 延迟确定性:不会被其他线程抢占,轮询周期稳定
  • NUMA 感知:线程与内存分配在同一 NUMA 节点,避免跨节点访问延迟
// EAL 初始化中的 lcore 绑核逻辑(简化)
int rte_eal_init(int argc, char **argv)
{
// 解析 -l/-c 参数,建立 lcore 映射表
eal_parse_args(argc, argv);
// 为每个 worker lcore 创建线程并绑核
RTE_LCORE_FOREACH_WORKER(lcore_id) {
// 设置 CPU 亲和性
CPU_ZERO(&cpuset);
CPU_SET(lcore_config[lcore_id].cpu_id, &cpuset);
// 创建线程
pthread_create(&lcore_config[lcore_id].thread_id,
NULL, eal_thread_loop, (void *)(uintptr_t)lcore_id);
// 绑定到指定 CPU 核心
pthread_setaffinity_np(lcore_config[lcore_id].thread_id,
sizeof(cpuset), &cpuset);
}
return 0;
}

1.2 Master lcore 与 Worker lcore#

DPDK 将 lcore 分为两类:

类型lcore ID职责特点
Master lcore0初始化、配置、管理主线程,调用 rte_eal_init() 的线程
Worker lcore1, 2, 3, …数据平面处理由 Master 通过远程启动 API 激活

Master lcore 负责 DPDK 的初始化工作(内存、PMD、队列配置等),然后通过 rte_eal_mp_remote_launch() 将工作函数分发到各个 Worker lcore 上执行。Worker lcore 启动后进入无限循环,持续执行被分配的处理函数。

// Master lcore 启动 Worker lcore
static int lcore_main(void *arg)
{
uint16_t port_id = (uint16_t)(uintptr_t)arg;
struct rte_mbuf *bufs[BURST_SIZE];
printf("lcore %u 处理 port %u\n", rte_lcore_id(), port_id);
while (!force_quit) {
// 收包
uint16_t nb_rx = rte_eth_rx_burst(port_id, 0, bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
continue;
// 处理 + 发包
for (uint16_t i = 0; i < nb_rx; i++) {
// 数据包处理逻辑...
process_packet(bufs[i]);
}
rte_eth_tx_burst(port_id, 0, bufs, nb_rx);
// 释放已发送的 mbuf
for (uint16_t i = 0; i < nb_rx; i++)
rte_pktmbuf_free(bufs[i]);
}
return 0;
}
int main(int argc, char **argv)
{
rte_eal_init(argc, argv);
// 在所有 Worker lcore 上启动 lcore_main
rte_eal_mp_remote_launch(lcore_main, NULL, CALL_MASTER);
// 等待所有 lcore 完成
rte_eal_mp_wait_lcore();
return 0;
}

1.3 rte_eal_mp_remote_launch 机制#

rte_eal_mp_remote_launch() 是 Master lcore 向 Worker lcore 分发任务的核心 API。其内部机制如下:

// rte_eal_mp_remote_launch 的内部实现(简化)
int rte_eal_mp_remote_launch(lcore_function_t *f, void *arg,
enum rte_rmt_call_master_t call_master)
{
int lcore_id;
// 遍历所有 Worker lcore,设置待执行函数
RTE_LCORE_FOREACH_WORKER(lcore_id) {
lcore_config[lcore_id].f = f;
lcore_config[lcore_id].arg = arg;
// 唤醒 Worker lcore(通过管道写触发)
rte_atomic_store_explicit(&lcore_config[lcore_id].state,
WAIT, rte_memory_order_release);
}
// 如果需要,Master lcore 也执行该函数
if (call_master == CALL_MASTER) {
f(arg);
}
return 0;
}

Worker lcore 的主循环在 eal_thread_loop() 中,它不断检查 lcore_config[state],一旦发现被设置了新的处理函数,就开始执行:

// Worker lcore 的主循环(简化)
static __attribute__((noreturn)) void *
eal_thread_loop(__attribute__((unused)) void *arg)
{
unsigned lcore_id = rte_lcore_id();
while (1) {
// 等待 Master 分配任务
while (lcore_config[lcore_id].state != WAIT)
pthread_cond_wait(...);
// 执行分配的函数
lcore_function_t *f = lcore_config[lcore_id].f;
void *f_arg = lcore_config[lcore_id].arg;
lcore_config[lcore_id].ret = f(f_arg);
// 标记完成
lcore_config[lcore_id].state = FINISHED;
}
}

1.4 lcore 模型全景#

flowchart TB subgraph EAL初始化["EAL 初始化阶段"] INIT["rte_eal_init()"] --> PARSE["解析 -l/-c 参数<br/>建立 lcore→CPU 映射"] PARSE --> CREATE["为每个 lcore 创建 pthread"] CREATE --> BIND["pthread_setaffinity_np()<br/>绑定到指定 CPU 核心"] end subgraph 运行时["运行时阶段"] MASTER["Master lcore (ID=0)<br/>初始化 / 配置 / 管理"] MASTER --> LAUNCH["rte_eal_mp_remote_launch()"] LAUNCH --> W1["Worker lcore 1<br/>数据平面处理"] LAUNCH --> W2["Worker lcore 2<br/>数据平面处理"] LAUNCH --> W3["Worker lcore 3<br/>数据平面处理"] LAUNCH --> WN["Worker lcore N<br/>数据平面处理"] end EAL初始化 --> 运行时 style EAL初始化 fill:#e3f2fd,stroke:#1565c0 style 运行时 fill:#e8f5e9,stroke:#2e7d32 style MASTER fill:#fff3e0,stroke:#e65100 style W1 fill:#f3e5f5,stroke:#6a1b9a style W2 fill:#f3e5f5,stroke:#6a1b9a style W3 fill:#f3e5f5,stroke:#6a1b9a style WN fill:#f3e5f5,stroke:#6a1b9a
Note

lcore 绑核只是第一步。要真正实现多核线性扩展,还需要确保数据包被均匀地分发到各个 lcore——这就是 RSS(Receive Side Scaling)的工作。在第 5 章中已经介绍了 RSS 的原理,本章将聚焦 lcore 之间如何协作处理这些被分发过来的数据包。

二、Run-to-Completion 模型#

2.1 核心思想#

Run-to-Completion(RTC,运行至完成)是 DPDK 最简单也最常用的多核模型。其核心思想是:每个 lcore 独立完成数据包的全部处理流程,从收包到发包,不与其他 lcore 共享任何中间状态

在 RTC 模型中,每个 lcore 绑定一个或多个 RX 队列,收包后直接在当前 lcore 上完成解析、查表、修改、发包的全部工作。不同 lcore 之间没有数据依赖,因此不需要跨核同步,天然实现了无锁并行。

2.2 数据流路径#

┌──────────────────────────────────────────┐
│ 网卡 (NIC) │
│ Queue 0 Queue 1 Queue 2 Queue 3 │
└────┬────────┬────────┬────────┬──────────┘
│ │ │ │
┌─────────▼──┐ ┌──▼────────┐ ┌──────▼───┐ ┌──▼─────────┐
│ lcore 0 │ │ lcore 1 │ │ lcore 2 │ │ lcore 3 │
│ │ │ │ │ │ │ │
│ rx_burst() │ │ rx_burst()│ │rx_burst()│ │ rx_burst() │
│ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │
│ parse() │ │ parse() │ │ parse() │ │ parse() │
│ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │
│ lookup() │ │ lookup() │ │ lookup() │ │ lookup() │
│ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │
│ modify() │ │ modify() │ │ modify() │ │ modify() │
│ ↓ │ │ ↓ │ │ ↓ │ │ ↓ │
│ tx_burst() │ │ tx_burst()│ │tx_burst()│ │ tx_burst() │
└────────────┘ └───────────┘ └──────────┘ └────────────┘

2.3 代码实现#

#define BURST_SIZE 32
// Run-to-Completion 主循环
static int rtc_lcore_main(void *arg)
{
uint16_t port_id = (uint16_t)(uintptr_t)arg;
uint16_t queue_id = rte_lcore_id() - 1; // 每个 lcore 使用自己的队列
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_mbuf *tx_bufs[BURST_SIZE];
uint16_t nb_tx;
printf("RTC: lcore %u 处理 port %u queue %u\n",
rte_lcore_id(), port_id, queue_id);
while (!force_quit) {
// 1. 收包
uint16_t nb_rx = rte_eth_rx_burst(port_id, queue_id,
bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
continue;
uint16_t tx_count = 0;
// 2. 逐包处理:解析 → 查表 → 修改
for (uint16_t i = 0; i < nb_rx; i++) {
struct rte_ether_hdr *eth = rte_pktmbuf_mtod(bufs[i],
struct rte_ether_hdr *);
// 简单的 L2 转发:交换源/目的 MAC
struct rte_ether_addr tmp;
rte_ether_addr_copy(&eth->src_addr, &tmp);
rte_ether_addr_copy(&eth->dst_addr, &eth->src_addr);
rte_ether_addr_copy(&tmp, &eth->dst_addr);
tx_bufs[tx_count++] = bufs[i];
}
// 3. 发包
nb_tx = rte_eth_tx_burst(port_id, queue_id, tx_bufs, tx_count);
// 4. 释放未发送的 mbuf
if (unlikely(nb_tx < tx_count)) {
for (uint16_t i = nb_tx; i < tx_count; i++)
rte_pktmbuf_free(tx_bufs[i]);
}
}
return 0;
}

2.4 RTC 模型的优缺点#

优势劣势
无锁设计,无跨核同步开销每个核必须完成全部处理,功能扩展性差
缓存友好,数据局部性极佳处理步骤多的场景下单核吞吐受限
编程模型简单,易于调试流表等共享状态需要特殊处理(RCU/读写锁)
线性扩展——核数翻倍吞吐翻倍负载不均时某些核空闲,利用率下降
Warning

RTC 模型的线性扩展有一个前提:流量必须被 RSS 均匀地分发到各个队列。如果流量特征导致 RSS 哈希倾斜(例如大量流量来自同一源 IP),某些 lcore 会过载而其他 lcore 空闲。此时需要考虑使用 Flow Director 或 rte_flow 做更精细的流量分发。

三、Pipeline 模型#

3.1 核心思想#

Pipeline(流水线)模型将数据包的处理流程拆分为多个阶段(Stage),每个阶段由不同的 lcore 负责。数据包在阶段之间通过 rte_ring 无锁队列传递。这与 CPU 指令流水线的思想一致——不同阶段并行工作,整体吞吐量取决于最慢的阶段。

3.2 Pipeline 架构图#

flowchart LR subgraph Stage1["Stage 1: 收包"] RX["lcore 0<br/>rx_burst()"] --> RING1["rte_ring<br/>rx_ring"] end subgraph Stage2["Stage 2: 解析+查表"] RING1 --> PARSE["lcore 1<br/>parse + lookup"] --> RING2["rte_ring<br/>tx_ring"] end subgraph Stage3["Stage 3: 发包"] RING2 --> TX["lcore 2<br/>tx_burst()"] end style Stage1 fill:#e3f2fd,stroke:#1565c0 style Stage2 fill:#e8f5e9,stroke:#2e7d32 style Stage3 fill:#fff3e0,stroke:#e65100 style RING1 fill:#fce4ec,stroke:#880e4f style RING2 fill:#fce4ec,stroke:#880e4f

3.3 代码实现#

// Pipeline 模型:三阶段流水线
// Stage 1: 收包 lcore
static int pipeline_rx_stage(void *arg)
{
uint16_t port_id = (uint16_t)(uintptr_t)arg;
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_ring *rx_ring = rte_ring_lookup("rx_ring");
while (!force_quit) {
uint16_t nb_rx = rte_eth_rx_burst(port_id, 0, bufs, BURST_SIZE);
if (nb_rx == 0)
continue;
// 将收到的 mbuf 指针放入 ring,传递给下一阶段
uint16_t enqueued = rte_ring_sp_enqueue_burst(
rx_ring, (void *const *)bufs, nb_rx, NULL);
// 释放未入队的 mbuf(ring 满时背压)
for (uint16_t i = enqueued; i < nb_rx; i++)
rte_pktmbuf_free(bufs[i]);
}
return 0;
}
// Stage 2: 解析 + 查表 lcore
static int pipeline_process_stage(void *arg)
{
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_ring *rx_ring = rte_ring_lookup("rx_ring");
struct rte_ring *tx_ring = rte_ring_lookup("tx_ring");
while (!force_quit) {
// 从 rx_ring 取出数据包
uint16_t nb_deq = rte_ring_sc_dequeue_burst(
rx_ring, (void **)bufs, BURST_SIZE, NULL);
if (nb_deq == 0)
continue;
// 处理每个数据包
for (uint16_t i = 0; i < nb_deq; i++) {
process_packet(bufs[i]);
}
// 将处理后的数据包放入 tx_ring
uint16_t enqueued = rte_ring_sp_enqueue_burst(
tx_ring, (void *const *)bufs, nb_deq, NULL);
for (uint16_t i = enqueued; i < nb_deq; i++)
rte_pktmbuf_free(bufs[i]);
}
return 0;
}
// Stage 3: 发包 lcore
static int pipeline_tx_stage(void *arg)
{
uint16_t port_id = (uint16_t)(uintptr_t)arg;
struct rte_mbuf *bufs[BURST_SIZE];
struct rte_ring *tx_ring = rte_ring_lookup("tx_ring");
while (!force_quit) {
uint16_t nb_deq = rte_ring_sc_dequeue_burst(
tx_ring, (void **)bufs, BURST_SIZE, NULL);
if (nb_deq == 0)
continue;
uint16_t nb_tx = rte_eth_tx_burst(port_id, 0, bufs, nb_deq);
for (uint16_t i = nb_tx; i < nb_deq; i++)
rte_pktmbuf_free(bufs[i]);
}
return 0;
}

3.4 RTC vs Pipeline 对比#

维度Run-to-CompletionPipeline
核间通信无(每核独立完成)有(rte_ring 传递)
缓存局部性极佳(数据不离开当前核)较差(数据跨核传递)
编程复杂度中(需管理 ring 和阶段间协议)
功能扩展差(每核重复全部逻辑)好(新功能加新阶段)
延迟低(单核直通)较高(经过多个 ring)
负载均衡依赖 RSS 均匀性可通过 ring 水位线动态调节
适用场景简单转发、L2/L3 交换复杂处理、多协议栈、防火墙
Note

实际生产中,两种模型经常混合使用。例如 OVS-DPDK 在入向使用 Pipeline(收包→解析→查表→动作),但在查表后对简单转发动作使用 RTC(直接在当前 lcore 发包),对需要复杂处理的动作(如隧道封装)才走后续 pipeline 阶段。

四、rte_ring 跨核通信#

4.1 为什么选择 rte_ring#

在 Pipeline 模型中,不同 lcore 之间需要传递数据包指针。传统的线程间通信方式(互斥锁、条件变量)在数据平面场景下性能不可接受——一个 pthread_mutex_lock() 可能需要数百纳秒,而数据包处理的目标是每包几十纳秒。

rte_ring 是 DPDK 提供的无锁环形缓冲区,专门用于高吞吐的跨核数据传递。在第 6 章中已经深入了 rte_ring 的内部实现,这里聚焦它在跨核通信场景中的使用模式。

4.2 SPSC Ring:Pipeline 的最佳选择#

在 Pipeline 模型中,两个相邻阶段之间通常是一对一的关系——一个 lcore 入队、一个 lcore 出队。这是典型的 SPSC(Single Producer Single Consumer)场景,也是 rte_ring 性能最高的模式。

// 创建 SPSC ring
struct rte_ring *ring;
ring = rte_ring_create("pipeline_ring", 1024,
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 生产者 lcore:批量入队
static int producer_lcore(void *arg)
{
struct rte_ring *ring = rte_ring_lookup("pipeline_ring");
struct rte_mbuf *bufs[BURST_SIZE];
while (!force_quit) {
uint16_t nb_rx = rte_eth_rx_burst(port_id, queue_id,
bufs, BURST_SIZE);
if (nb_rx == 0)
continue;
// SP 批量入队:无需 CAS,无需重试
uint16_t enqueued = rte_ring_sp_enqueue_burst(
ring, (void *const *)bufs, nb_rx, NULL);
// 处理未入队的包(背压)
for (uint16_t i = enqueued; i < nb_rx; i++)
rte_pktmbuf_free(bufs[i]);
}
return 0;
}
// 消费者 lcore:批量出队
static int consumer_lcore(void *arg)
{
struct rte_ring *ring = rte_ring_lookup("pipeline_ring");
struct rte_mbuf *bufs[BURST_SIZE];
while (!force_quit) {
// SC 批量出队:无需 CAS,无需重试
uint16_t nb_deq = rte_ring_sc_dequeue_burst(
ring, (void **)bufs, BURST_SIZE, NULL);
if (nb_deq == 0)
continue;
// 处理数据包
for (uint16_t i = 0; i < nb_deq; i++)
process_packet(bufs[i]);
}
return 0;
}

4.3 批量操作与背压#

rte_ring 的批量操作(burst)是性能的关键。批量入队/出队将多次指针拷贝合并为一次连续内存写入/读取,充分利用 CPU 缓存行的预取能力:

// 批量入队的内部优化(简化)
static __rte_always_inline unsigned int
rte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table,
unsigned int n, unsigned int *free_space)
{
uint32_t prod_head = r->prod.head;
uint32_t cons_tail = r->cons.tail;
uint32_t free_count = r->size + cons_tail - prod_head;
// 实际入队数量 = min(请求数量, 可用空间)
uint32_t actual = (n > free_count) ? free_count : n;
if (actual == 0) {
if (free_space) *free_space = 0;
return 0;
}
// 批量拷贝指针——连续内存写入,缓存行友好
const uint32_t mask = r->mask;
uint32_t idx = prod_head & mask;
// 处理环形回绕:可能需要分两段拷贝
if (idx + actual <= r->size) {
// 无回绕:一次 memcpy
memcpy(&r->ring[idx], obj_table, actual * sizeof(void *));
} else {
// 有回绕:分两段拷贝
uint32_t first_part = r->size - idx;
memcpy(&r->ring[idx], obj_table, first_part * sizeof(void *));
memcpy(&r->ring[0], &obj_table[first_part],
(actual - first_part) * sizeof(void *));
}
// 内存屏障 + 更新指针
rte_smp_wmb();
r->prod.head = prod_head + actual;
r->prod.tail = prod_head + actual;
if (free_space) *free_space = free_count - actual;
return actual;
}

4.4 Cache-line 对齐#

rte_ring 的性能优化中,cache-line 对齐是至关重要的一环。生产者的 head/tail 和消费者的 head/tail 被放在不同的 cache-line 上,避免 false sharing:

// rte_ring 结构体中的 cache-line 对齐(DPDK 源码简化)
struct rte_ring {
// ... 其他字段 ...
alignas(RTE_CACHE_LINE_SIZE) struct rte_ring_headtail prod;
// prod 独占一个 cache-line(64 字节)
alignas(RTE_CACHE_LINE_SIZE) struct rte_ring_headtail cons;
// cons 独占一个 cache-line(64 字节)
};

如果 prodcons 在同一个 cache-line 上,生产者更新 prod.head 时会使消费者核心上的 cache-line 失效,反之亦然。这就是 false sharing——虽然两者访问的是不同变量,但因为它们在同一 cache-line 上,导致不必要的缓存弹跳。通过 cache-line 对齐,生产者和消费者各自操作独立的 cache-line,互不干扰。

Warning

在 NUMA 系统中,rte_ring 必须创建在正确的 NUMA 节点上。如果 ring 分配在 NUMA 节点 0,但生产者在节点 1 的 CPU 上运行,每次入队操作都需要跨节点访问内存,延迟从 ~50ns 增加到 ~150ns。始终使用 rte_socket_id() 获取当前 lcore 所在的 NUMA 节点,并在该节点上创建 ring。

五、原子操作与内存屏障#

5.1 为什么需要原子操作#

在多核环境中,即使采用了 RTC 模型减少核间共享,某些场景仍然需要跨核访问共享数据——例如统计计数器、流表更新、全局配置。这些场景下,简单的读写操作不是原子的:

// 非原子的计数器更新——多核下会丢失更新
uint64_t packet_count; // 全局计数器
// lcore 0 和 lcore 1 同时执行:
packet_count++; // 编译为:load → add → store,不是原子操作
// 可能的结果:两个核都 load 了相同的值,各自 +1 后 store,丢失一次更新

5.2 rte_atomic 操作#

DPDK 提供了 rte_atomic 系列操作,封装了编译器屏障和 CPU 原子指令:

#include <rte_atomic.h>
// 原子计数器
static rte_atomic64_t total_packets;
// 原子递增
rte_atomic64_inc(&total_packets);
// 原子读取
uint64_t count = rte_atomic64_read(&total_packets);
// 原子比较并交换(CAS)
rte_atomic64_t version;
int64_t expected = 0;
int success = rte_atomic64_cmpset(
(volatile uint64_t *)&version, // 目标地址
(uint64_t)expected, // 期望值
(uint64_t)1 // 新值
);
// 如果 version == expected,则 version = 1,返回 1
// 否则不做任何操作,返回 0

5.3 内存屏障#

内存屏障(Memory Barrier/Fence)是多核编程中最微妙也最重要的概念。现代 CPU 和编译器都会对指令进行重排序,单线程下这不会影响正确性,但多线程下可能导致另一个核心看到不一致的内存状态。

DPDK 提供了不同粒度的内存屏障:

// 编译器屏障:阻止编译器重排序,但不影响 CPU
rte_compiler_barrier();
// SMP 内存屏障:在所有核心之间建立全局顺序
rte_smp_mb(); // 全屏障(读+写)
rte_smp_rmb(); // 读屏障
rte_smp_wmb(); // 写屏障
// I/O 内存屏障:用于设备寄存器访问
rte_io_mb();
rte_io_rmb();
rte_io_wmb();

屏障使用场景示例——rte_ring 的两阶段提交:

// rte_ring MPMC enqueue 中的屏障使用
static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
unsigned int n, uint32_t *old_head,
uint32_t *new_head, uint32_t *free_entries)
{
do {
// 1. 读取当前 prod.head
*old_head = r->prod.head;
// 2. 编译器屏障:确保 old_head 的读取不会被重排到后面
rte_smp_rmb();
// 3. 读取 cons.tail(消费者已确认的出队位置)
*free_entries = r->size + r->cons.tail - *old_head;
if (n > *free_entries)
return 0;
*new_head = *old_head + n;
// 4. CAS 更新 prod.head(包含完整的内存屏障)
} while (rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head) == 0);
return n;
}

5.4 C11 Atomics:现代 DPDK 的选择#

从 DPDK 21.11 开始,官方推荐使用 C11 标准的原子操作替代 rte_atomic。C11 原子提供了更清晰的内存序语义:

#include <stdatomic.h>
// C11 原子变量
static atomic_uint_fast64_t total_packets;
// 原子递增(relaxed 序:无屏障,最快)
atomic_fetch_add_explicit(&total_packets, 1, memory_order_relaxed);
// 原子读取(relaxed 序:仅保证原子性,不保证顺序)
uint64_t count = atomic_load_explicit(&total_packets, memory_order_relaxed);
// 原子 CAS(acquire-release 序:保证 CAS 前后的读写顺序)
uint64_t expected = 0;
bool success = atomic_compare_exchange_strong_explicit(
&version, &expected, 1,
memory_order_acq_rel, // 成功时的内存序
memory_order_acquire // 失败时的内存序
);

5.5 无锁模式:单生产者计数器#

在数据平面中,最常见的统计场景是”每个 lcore 独立计数,偶尔汇总”。这种场景下,不需要原子操作——每个 lcore 维护一个 per-lcore 计数器,汇总时只需读取所有 lcore 的计数器求和:

// Per-lcore 计数器——完全无锁
static RTE_DEFINE_PER_LCORE(uint64_t, local_packet_count);
// 每个 lcore 独立更新自己的计数器(无原子操作,无屏障)
local_packet_count++;
// 汇总:Master lcore 读取所有 Worker 的计数器
uint64_t get_total_packets(void)
{
uint64_t total = 0;
unsigned int lcore_id;
RTE_LCORE_FOREACH_WORKER(lcore_id) {
total += RTE_PER_LCORE_BY_ID(lcore_id, local_packet_count);
}
total += local_packet_count; // 加上 Master 的
return total;
}
Note

Per-lcore 计数器是 DPDK 中最推荐的统计方式。rte_eth_xstats_get() 返回的统计信息就是 per-queue 的,每个 lcore 只更新自己队列的计数器,零同步开销。只有在需要全局实时一致视图时(如 QoS 限速),才需要原子操作。

六、RCU 机制:rte_rcu_qsbr#

6.1 读多写少的困境#

在数据平面中,流表(Flow Table)、路由表(FIB)、ACL 规则等数据结构是典型的”读多写少”场景——每个数据包都要查表,但表项的增删改相对稀少。如果用读写锁保护这些数据结构,读操作虽然可以并发,但写操作会阻塞所有读者,导致数据平面抖动。

RCU(Read-Copy-Update)是一种优雅的解决方案:读者无锁访问,写者创建副本修改,等所有读者离开临界区后回收旧数据。Linux 内核广泛使用 RCU 保护各种数据结构,DPDK 也提供了用户态 RCU 实现——rte_rcu_qsbr

6.2 静默状态与宽限期#

RCU 的核心概念有两个:

  • 静默状态(Quiescent State, QS):读者声明”我当前不在访问 RCU 保护的数据”。在 DPDK 中,每个 lcore 在一轮处理循环的间隙(两次 rx_burst() 之间)报告静默状态
  • 宽限期(Grace Period):从写者发布新版本数据开始,到所有读者都至少报告了一次静默状态的时间段。宽限期结束后,旧数据可以安全回收
sequenceDiagram participant W as 写者线程 participant R1 as 读者 lcore 0 participant R2 as 读者 lcore 1 participant R3 as 读者 lcore 2 W->>W: 创建新版本数据 W->>W: 原子替换指针(发布新版本) Note over W,R3: 宽限期开始 R1->>R1: 可能仍在读旧版本 R2->>R2: 可能仍在读旧版本 R3->>R3: 可能仍在读旧版本 R1->>R1: 报告 QS(离开临界区) R2->>R2: 报告 QS(离开临界区) R3->>R3: 报告 QS(离开临界区) Note over W: 宽限期结束:所有读者都已报告 QS W->>W: 安全回收旧版本数据

6.3 rte_rcu_qsbr API 与代码示例#

#include <rte_rcu_qsbr.h>
#define MAX_LCORES 64
// RCU 变量
struct rte_rcu_qsbr *qsv;
// 初始化 RCU
int init_rcu(void)
{
size_t sz = rte_rcu_qsbr_get_memsize(MAX_LCORES);
qsv = rte_zmalloc("rcu", sz, RTE_CACHE_LINE_SIZE);
if (qsv == NULL)
return -ENOMEM;
return rte_rcu_qsbr_init(qsv, MAX_LCORES);
}
// 读者 lcore:注册 + 报告静默状态
static int reader_lcore(void *arg)
{
unsigned int lcore_id = rte_lcore_id();
// 注册当前 lcore 为 RCU 读者
rte_rcu_qsbr_thread_register(qsv, lcore_id);
rte_rcu_qsbr_thread_online(qsv, lcore_id);
while (!force_quit) {
// 读取 RCU 保护的数据(无锁)
struct flow_entry *flow = lookup_flow_table(packet);
if (flow) {
// 使用 flow 数据——在此期间,写者不会回收 flow
process_flow(flow, packet);
}
// 报告静默状态:声明"我已离开临界区"
rte_rcu_qsbr_quiescent(qsv, lcore_id);
}
rte_rcu_qsbr_thread_offline(qsv, lcore_id);
rte_rcu_qsbr_thread_unregister(qsv, lcore_id);
return 0;
}
// 写者线程:更新流表
void update_flow_entry(struct flow_entry *old_flow,
struct flow_entry *new_flow)
{
// 1. 创建新版本(copy + modify)
// new_flow 已经在外部创建并初始化
// 2. 原子替换指针
__atomic_store_n(&flow_table[old_flow->hash], new_flow,
__ATOMIC_RELEASE);
// 3. 等待宽限期:所有读者都已离开旧数据的临界区
rte_rcu_qsbr_synchronize(qsv, RTE_QSBR_THRID_INVALID, 0);
// 4. 安全回收旧数据
rte_free(old_flow);
}

6.4 宽限期检查的优化#

rte_rcu_qsbr_synchronize() 可以阻塞等待宽限期结束,但在数据平面中,阻塞写者线程是不可接受的。DPDK 提供了异步回调机制:

// 异步 RCU 回调:宽限期结束后自动回收旧数据
struct rte_rcu_qsbr_dq *dq;
// 创建延迟回收队列
struct rte_rcu_qsbr_dq_parameters params = {
.name = "flow_rcu_dq",
.size = 1024,
.esize = sizeof(struct flow_entry *),
.qsv = qsv,
.trigger_reclaim_limit = 128, // 队列中超过 128 个待回收项时触发
.reclaim_max = 64, // 每次最多回收 64 个
};
dq = rte_rcu_qsbr_dq_create(&params);
// 写者:将旧数据放入延迟回收队列(不阻塞)
void update_flow_async(struct flow_entry *old_flow,
struct flow_entry *new_flow)
{
// 原子替换指针
__atomic_store_n(&flow_table[old_flow->hash], new_flow,
__ATOMIC_RELEASE);
// 将旧数据入队,由 RCU 在宽限期后自动回收
rte_rcu_qsbr_dq_enqueue(dq, &old_flow, 1);
}
// 定期调用(可在任意 lcore 的主循环中)
void reclaim_rcu_resources(void)
{
rte_rcu_qsbr_dq_reclaim(dq, UINT_MAX, NULL, NULL, NULL);
}
Warning

RCU 不能保护写者之间的并发——同一时刻只能有一个写者修改 RCU 保护的数据结构。如果多个线程需要并发写入,必须额外使用互斥锁或自旋锁保护写路径。RCU 解决的是”读者无锁 + 写者不阻塞读者”的问题,而不是”写者并发”的问题。

七、Eventdev:事件驱动框架#

7.1 从轮询到事件驱动#

前面讨论的 RTC 和 Pipeline 模型都有一个共同特点:lcore 与队列的绑定关系是静态的。在 RTC 模型中,lcore 0 处理 Queue 0,lcore 1 处理 Queue 1;在 Pipeline 模型中,lcore 0 收包,lcore 1 查表,lcore 2 发包。这种静态绑定在负载均衡上存在天然缺陷——如果 Queue 0 的流量远高于 Queue 1,lcore 0 过载而 lcore 1 空闲。

Eventdev(Event Device)是 DPDK 提供的事件驱动框架,它将”数据包到达”抽象为”事件”,由调度器(Scheduler)动态地将事件分发到空闲的 Worker lcore 上。Worker 不绑定特定队列,而是从调度器获取下一个待处理的事件——哪个 lcore 空闲,事件就分给谁。

7.2 Eventdev 核心概念#

Eventdev 涉及以下核心概念:

概念说明
Event事件载体,可以是数据包(RTE_EVENT_TYPE_ETHDEV)、定时器(RTE_EVENT_TYPE_TIMER)或自定义事件
Queue事件队列,事件在 Worker 处理前暂存于此
Port事件端口,Worker 通过 Port 从 Queue 获取事件或向 Queue 提交事件
Scheduler调度器,负责将事件从 Queue 分发到 Port
PMDEventdev 驱动,可以是软件实现(event_sw)或硬件实现(event_dlb2

7.3 调度模式#

Eventdev 支持三种调度模式,决定了事件在不同 Worker 之间的分发策略:

Atomic(原子调度):同一 flow 的事件始终被分发到同一个 Worker,保证同一 flow 的事件按序处理。这是最常用的模式,适用于需要流级一致性的场景(如流表查找、状态防火墙)。

Ordered(有序调度):同一 flow 的事件可以被分发到不同 Worker 并行处理,但在最终输出时恢复原始顺序。适用于计算密集型处理(如深度包检测、加密),需要并行加速但要求输出有序。

Parallel(并行调度):事件可以被分发到任意 Worker,不保证顺序。适用于无状态处理(如统计、日志),每个事件独立处理,不需要流级一致性。

flowchart TB subgraph 事件源["事件源"] NIC["网卡 RX"] --> |"RTE_EVENT_TYPE_ETHDEV"| EQ1["Event Queue 0<br/>Atomic"] NIC --> EQ2["Event Queue 1<br/>Ordered"] TIMER["定时器"] --> |"RTE_EVENT_TYPE_TIMER"| EQ3["Event Queue 2<br/>Parallel"] end subgraph 调度器["Eventdev Scheduler"] SCHED["调度器<br/>根据调度模式分发事件"] end subgraph Worker["Worker lcores"] W1["lcore 0<br/>event_dequeue()"] W2["lcore 1<br/>event_dequeue()"] W3["lcore 2<br/>event_dequeue()"] end EQ1 --> SCHED EQ2 --> SCHED EQ3 --> SCHED SCHED --> W1 SCHED --> W2 SCHED --> W3 W1 --> |"event_enqueue()"| SCHED W2 --> |"event_enqueue()"| SCHED W3 --> |"event_enqueue()"| SCHED style 事件源 fill:#e3f2fd,stroke:#1565c0 style 调度器 fill:#fff3e0,stroke:#e65100 style Worker fill:#e8f5e9,stroke:#2e7d32

7.4 硬件与软件 PMD#

Eventdev 的调度器可以由硬件或软件实现:

PMD类型特点
event_dlb2硬件(Intel DSA/DLB2)调度在硬件中完成,延迟极低,吞吐极高
event_cn10k硬件(Marvell CN10K)集成 SSO 调度器,与以太网设备紧密耦合
event_sw软件纯软件实现,可在任何平台上运行,性能较低
event_dsw软件(分布式)分布式软件调度器,避免中心化瓶颈

硬件 Eventdev PMD 将调度逻辑卸载到专用硬件,可以显著降低调度延迟。但软件 PMD 提供了更好的灵活性和可移植性,适合开发调试阶段使用。

7.5 Eventdev 代码示例#

#include <rte_eventdev.h>
#define NUM_EVENTS 32
// Eventdev 初始化
int setup_eventdev(uint8_t dev_id, uint16_t nb_ports, uint16_t nb_queues)
{
struct rte_event_dev_config dev_conf = {
.nb_events_limit = 4096,
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
.nb_event_queue_flows = 1024,
.nb_event_port_dequeue_depth = 64,
.nb_event_port_enqueue_depth = 64,
};
int ret = rte_event_dev_configure(dev_id, &dev_conf);
if (ret < 0)
return ret;
// 配置事件队列
struct rte_event_queue_conf queue_conf = {
.schedule_type = RTE_SCHED_TYPE_ATOMIC,
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
.nb_atomic_flows = 1024,
.nb_atomic_order_sequences = 1024,
};
for (uint16_t i = 0; i < nb_queues; i++) {
rte_event_queue_setup(dev_id, i, &queue_conf);
}
// 配置事件端口
struct rte_event_port_conf port_conf = {
.new_event_threshold = 1024,
.dequeue_depth = 64,
.enqueue_depth = 64,
};
for (uint16_t i = 0; i < nb_ports; i++) {
rte_event_port_setup(dev_id, i, &port_conf);
// 将所有队列链接到该端口
uint8_t queue_ids[nb_queues];
for (uint16_t q = 0; q < nb_queues; q++)
queue_ids[q] = q;
rte_event_port_link(dev_id, i, queue_ids, NULL, nb_queues);
}
return rte_event_dev_start(dev_id);
}
// RX Adapter:将网卡收包转换为事件
int setup_rx_adapter(uint8_t eventdev_id, uint16_t eth_port_id)
{
struct rte_event_eth_rx_adapter_conf conf = {
.event_buf_size = 1024,
};
uint8_t rx_adapter_id;
rte_event_eth_rx_adapter_create_ext(&rx_adapter_id, eventdev_id,
rx_adapter_conf_cb, &conf);
struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
.rx_queue_flags = 0,
.ev = {
.queue_id = 0, // 事件进入 Queue 0
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
.sched_type = RTE_SCHED_TYPE_ATOMIC,
},
.servicing_weight = 1, // 轮询模式
};
rte_event_eth_rx_adapter_queue_add(rx_adapter_id, eth_port_id,
-1, &queue_conf);
rte_event_eth_rx_adapter_start(rx_adapter_id);
return 0;
}
// Worker lcore:事件驱动处理
static int eventdev_worker(void *arg)
{
uint8_t dev_id = (uint8_t)(uintptr_t)arg;
uint8_t port_id = rte_lcore_id(); // 每个 lcore 使用自己的 port
struct rte_event events[NUM_EVENTS];
while (!force_quit) {
// 从调度器获取事件
uint16_t nb_deq = rte_event_dequeue_burst(dev_id, port_id,
events, NUM_EVENTS, 0);
if (nb_deq == 0)
continue;
for (uint16_t i = 0; i < nb_deq; i++) {
struct rte_event *ev = &events[i];
struct rte_mbuf *mbuf = ev->mbuf;
// 处理数据包
process_packet(mbuf);
// 如果需要继续下一阶段处理,重新入队
if (needs_further_processing(mbuf)) {
ev->queue_id = 1; // 发送到下一个队列
rte_event_enqueue_burst(dev_id, port_id, ev, 1);
} else {
// 处理完成,释放 mbuf
rte_pktmbuf_free(mbuf);
}
}
}
return 0;
}

7.6 Eventdev vs RTC/Pipeline#

维度RTCPipelineEventdev
负载均衡依赖 RSS,可能不均静态分配,可能不均动态调度,自动均衡
核间通信rte_ringEventdev 内部调度
编程模型最简单中等较复杂(需理解调度模式)
延迟最低中等调度器引入额外延迟
吞吐单核高,多核依赖 RSS受最慢阶段限制动态均衡,整体吞吐高
硬件加速硬件调度器(DLB2/CN10K)
适用场景简单转发多阶段处理负载波动大、需动态均衡
Note

Eventdev 不是 RTC/Pipeline 的替代品,而是补充。在负载稳定、RSS 分流均匀的场景下,RTC 的性能仍然最优——因为它完全没有调度开销。Eventdev 的优势在于负载波动大、流量模式复杂的场景,如电信级 vEPC/UPF,不同用户的流量特征差异巨大,静态绑定无法实现有效均衡。

八、动手实践#

实践 1:多核 RTC 转发器#

构建一个 4 核 RTC 转发器,每个 lcore 处理一个 RX 队列,实现 L2 转发。观察 RSS 哈希是否将流量均匀分发到各个队列:

# 启动 testpmd,4 个 lcore,4 个队列
dpdk-testpmd -l 0-4 -n 4 -- -i --rxq=4 --txq=4 --rss-ip
# testpmd 中查看各队列收包统计
show port xstats all
# 使用 pktgen 发送不同源 IP 的流量,观察 RSS 分发
# 预期:4 个队列的收包数大致相等

验证要点

  • 各队列收包数是否均匀(偏差 < 10%)
  • 单核吞吐量是否与核数线性增长
  • perf top 观察各 lcore 的 CPU 利用率是否接近 100%

实践 2:Pipeline 模型与性能对比#

将实践 1 的 RTC 转发器改造为 3 阶段 Pipeline(收包→处理→发包),使用 rte_ring 传递数据包。对比两种模型的吞吐量和延迟:

// 创建跨核 ring
struct rte_ring *rx_ring = rte_ring_create("rx_ring", 4096,
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
struct rte_ring *tx_ring = rte_ring_create("tx_ring", 4096,
rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
// 启动 3 个 lcore 分别执行收包、处理、发包
rte_eal_remote_launch(rx_stage, (void *)(uintptr_t)port_id, 1);
rte_eal_remote_launch(process_stage, NULL, 2);
rte_eal_remote_launch(tx_stage, (void *)(uintptr_t)port_id, 3);

验证要点

  • Pipeline 模型的吞吐量是否低于 RTC(因 ring 传递开销)
  • Pipeline 模型的延迟是否高于 RTC(经过多个 ring)
  • 调整 ring 大小(256/1024/4096)对性能的影响

实践 3:RCU 流表更新#

使用 rte_rcu_qsbr 实现一个可动态更新的流表。Reader lcore 查表转发,Writer 线程定期添加/删除流表项:

// 流表结构(RCU 保护)
struct flow_table_entry {
uint32_t src_ip;
uint32_t dst_ip;
uint16_t src_port;
uint16_t dst_port;
uint8_t proto;
uint16_t output_port;
} __rte_cache_aligned;
// 查表(读者,无锁)
static inline struct flow_table_entry *
flow_lookup(struct flow_table *tbl, struct rte_mbuf *pkt)
{
// 计算哈希
uint32_t hash = flow_hash(pkt);
// 读取指针(原子 load,RCU 保护)
struct flow_table_entry *entry =
__atomic_load_n(&tbl->entries[hash], __ATOMIC_ACQUIRE);
return entry;
}
// 更新流表(写者,RCU 保护)
void flow_table_update(struct flow_table *tbl, uint32_t hash,
struct flow_table_entry *new_entry)
{
struct flow_table_entry *old_entry =
__atomic_exchange_n(&tbl->entries[hash], new_entry,
__ATOMIC_ACQ_REL);
if (old_entry) {
// 等待宽限期后回收旧表项
rte_rcu_qsbr_synchronize(qsv, RTE_QSBR_THRID_INVALID, 0);
rte_free(old_entry);
}
}

验证要点

  • 流表更新期间,Reader lcore 是否出现延迟抖动
  • 使用 rte_rcu_qsbr_dq 异步回收 vs 同步 synchronize 的性能差异
  • 宽限期的典型时长(与 lcore 数量和处理循环频率相关)

实践 4:Eventdev 原子调度#

使用 Eventdev 的软件 PMD(event_dsw)构建一个事件驱动的转发器。对比 Atomic 和 Parallel 两种调度模式下的吞吐量和延迟:

# 使用 event_dsw PMD
# 在应用启动时指定 --vdev=event_dsw0
dpdk-app --vdev=event_dsw0 -l 0-4 -n 4 -- -p 0x3

验证要点

  • Atomic 模式下,同一 flow 的事件是否始终由同一 Worker 处理
  • Parallel 模式下,吞吐量是否高于 Atomic(无流级串行化)
  • 软件调度器的 CPU 开销占比(perf top 观察调度器函数)

小结#

本章系统剖析了 DPDK 的多核与并发模型,这是从单核性能到多核扩展的关键跨越。核心要点如下:

  1. lcore 模型:DPDK 通过 EAL 将 pthread 绑定到 CPU 核心,Master lcore 负责初始化和管理,Worker lcore 执行数据平面处理。rte_eal_mp_remote_launch() 是任务分发的核心 API。

  2. Run-to-Completion:每个 lcore 独立完成收包→处理→发包的全流程,无锁无同步,缓存局部性极佳,是简单转发场景的最优选择。线性扩展的前提是 RSS 均匀分流。

  3. Pipeline:将处理流程拆分为多个阶段,通过 rte_ring 传递数据包。功能扩展性好,适合复杂处理场景,但 ring 传递引入额外延迟和缓存失效。

  4. rte_ring 跨核通信:SPSC 模式是 Pipeline 的最佳选择,批量操作和 cache-line 对齐是性能关键。NUMA 感知是必须的——跨节点 ring 访问延迟增加 3 倍。

  5. 原子操作与内存屏障rte_atomic 和 C11 atomics 提供原子操作,内存屏障保证多核可见性。Per-lcore 计数器是最推荐的统计方式,完全无锁。

  6. RCU 机制rte_rcu_qsbr 实现用户态 RCU,读者无锁访问,写者等宽限期后回收旧数据。异步回调(rte_rcu_qsbr_dq)避免写者阻塞,是生产环境的首选。

  7. Eventdev:事件驱动框架通过动态调度实现负载均衡,支持 Atomic/Ordered/Parallel 三种调度模式。硬件 PMD(DLB2/CN10K)提供极低调度延迟,适合电信级场景。

选择哪种模型,取决于你的场景:

  • 简单转发、负载均匀 → RTC
  • 多阶段处理、功能复杂 → Pipeline
  • 负载波动大、需动态均衡 → Eventdev
  • 读多写少的共享数据 → RCU

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

部分信息可能已经过时

相关文章 智能推荐
1
OVS-DPDK 与虚拟交换
高性能网络 深入 OVS-DPDK 与虚拟交换——OVS-DPDK 架构与内核 OVS 对比、dpdkvhostuser 端口类型、vhost-user 协议与共享 virtio 环、virtio 前后端、VM-to-VM 交换路径、流分类(EMC/DPCLS/megaflows)、性能调优——掌握云环境虚拟网络加速的完整技术栈。
2
DPDK 架构全景与核心概念
高性能网络 深入 DPDK 整体架构——EAL 抽象层初始化流程、内存管理概览、轮询模式驱动概念、环形缓冲区与 mbuf、lcore 线程模型、Meson 构建系统,以及从零编写第一个 DPDK 应用——helloworld 全代码走读。
3
SmartNIC 与 DPU
高性能网络 深入 SmartNIC 与 DPU——硬件卸载概念与收益、SmartNIC 架构(固定功能 vs 可编程)、DPU 产品矩阵(NVIDIA BlueField/AMD Pensando/Intel IPU)、OVS 硬件卸载(tc/rte_flow/ASAP²)、P4 编程入门——掌握硬件加速网络的完整技术栈。
4
SPDK 与存储旁通
高性能网络 深入 SPDK 与存储旁通——SPDK 架构与 DPDK EAL 共享、用户态 NVMe 驱动(MMIO/SQ/CQ)、vhost-user 协议与共享内存、bdev 抽象层与模块、blobstore 持久化对象存储、iSCSI/NVMe-oF Target——掌握用户态存储的完整技术栈。
5
VPP 与 FD.io 数据平面
高性能网络 深入 VPP 与 FD.io 数据平面——矢量包处理(Vector Packet Processing)的 I-Cache 友好设计、插件框架、图节点实现与类型、CLI 与 binary API、预取优化与批量处理、VPP + DPDK 输入节点——掌握下一代数据平面的架构与编程。