mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
4602 字
12 分钟
内核同步机制
2024-10-19

第 4 章:进程调度中,我们看到了多个进程如何在 CPU 上并发执行;在第 13 章:中断与软中断中,我们了解了中断如何在任意时刻打断当前执行流。这些机制带来了一个根本性的问题:当多个执行流同时访问同一份数据时,如何保证数据的一致性?

这就是内核同步机制要解决的核心问题。与用户态程序不同,内核中的并发来源更加复杂——SMP 多核并行、中断随时打断、内核抢占随时切换——每一种并发源都可能在不经意间破坏共享数据的完整性。Linux 内核为此提供了一套层次分明、各有所长的同步原语:从最轻量的原子操作和内存屏障,到忙等的自旋锁,再到可睡眠的互斥锁,再到读无锁的 RCU,每一种原语都在特定场景下发挥不可替代的作用。

本章将从并发的根源出发,逐层剖析 Linux 内核同步机制的设计与实现。

一、并发来源:为什么内核需要同步#

1.1 四大并发源#

内核中的共享数据可能被以下四种执行流并发访问:

并发源描述典型场景
SMP 多核多个 CPU 同时执行内核代码,真正意义上的并行CPU 0 和 CPU 1 同时修改全局链表
中断中断处理程序打断当前执行流,异步访问共享数据网卡中断修改 sk_buff 链表
抢占内核抢占导致当前执行流被切换,另一个进程访问同一数据进程 A 持有链表指针时被抢占,进程 B 修改链表
软中断 / Tasklet软中断在开中断环境下执行,可被中断打断NET_RX 软中断处理网络包时被定时器中断打断
flowchart TB subgraph 并发来源 A["SMP 多核并行<br/>CPU 0 与 CPU 1 同时执行"] B["硬件中断<br/>异步打断当前执行流"] C["内核抢占<br/>高优先级进程抢占"] D["软中断 / Tasklet<br/>Bottom Half 并发执行"] end subgraph 共享数据 S["全局链表 / 计数器 / 状态标志"] end A --> S B --> S C --> S D --> S subgraph 同步原语 L["锁 / 原子操作 / RCU / percpu"] end S --> L

1.2 临界区与竞态条件#

临界区(Critical Section) 是访问共享数据的代码段。当多个执行流同时进入临界区时,执行结果取决于它们的交错顺序——这就是竞态条件(Race Condition)

一个经典的例子是链表插入:

// 无保护的链表插入——存在竞态条件
void list_add_unsafe(struct list_head *new, struct list_head *head) {
new->next = head->next; // 步骤 1
head->next->prev = new; // 步骤 2
head->next = new; // 步骤 3
new->prev = head; // 步骤 4
}

如果 CPU 0 执行完步骤 1 后被 CPU 1 抢先执行了完整的插入操作,链表就会被破坏。同步机制的核心目标就是保证临界区的互斥访问

1.3 同步策略的选择维度#

选择同步原语时需要考虑以下维度:

维度选项影响
能否睡眠可睡眠 / 不可睡眠中断上下文中不能睡眠
争用程度低争用 / 高争用低争用用原子操作,高争用用锁
读写比例读多写少 / 读写均衡读多写少用 RCU 或读写锁
持锁时间短 / 长短用自旋锁,长用互斥锁
上下文进程上下文 / 中断上下文中断上下文只能用自旋锁
Note

内核中有一条铁律:中断上下文(包括软中断、Tasklet、定时器)绝对不能睡眠。睡眠意味着自愿让出 CPU,但中断上下文没有独立的 task_struct,调度器无法将其重新调度执行——这将导致系统死锁或崩溃。

二、原子操作与内存屏障#

2.1 atomic_t:不可分割的操作#

原子操作是同步的基石——所有更高级的同步原语都建立在原子操作之上。Linux 内核用 atomic_t(32 位)和 atomic64_t(64 位)封装原子变量:

include/linux/types.h
typedef struct {
int counter;
} atomic_t;
// include/asm-generic/atomic.h
#define atomic_read(v) READ_ONCE((v)->counter)
#define atomic_set(v, i) WRITE_ONCE((v)->counter, (i))
// 架构相关实现,以 x86 为例
static __always_inline void atomic_add(int i, atomic_t *v)
{
asm volatile(LOCK_PREFIX "addl %1,%0"
: "+m" (v->counter)
: "ir" (i) : "memory");
}
static __always_inline int atomic_add_return(int i, atomic_t *v)
{
return i + xadd(&v->counter, i);
}

LOCK_PREFIX 在 SMP 系统上展开为 lock 前缀,触发 CPU 的**缓存锁定(Cache Locking)**机制——通过锁定缓存行而非总线,实现比早期总线锁更高效的原子操作。

2.2 CMPXCHG:比较并交换#

cmpxchg(Compare-And-Swap)是无锁编程的核心原语,也是许多锁实现的底层支撑:

arch/x86/include/asm/cmpxchg.h
static __always_inline unsigned long cmpxchg(volatile void *ptr,
unsigned long old, unsigned long new, int size)
{
// 原子地比较 *ptr 与 old
// 如果相等,将 new 写入 *ptr 并返回 old
// 如果不等,返回 *ptr 的当前值
}

cmpxchg 的语义是原子的条件写入:只有在值未被他人修改时才执行写入。这种”乐观并发”模式被广泛用于无锁数据结构和锁的获取:

// 自旋锁获取的简化实现
static __always_inline int ticket_lock_acquire(arch_spinlock_t *lock)
{
// 原子地获取并递增 ticket
unsigned int ticket = atomic_inc_return(&lock->tickets.tail);
// 忙等直到轮到自己
while (atomic_read(&lock->tickets.head) != ticket)
cpu_relax();
}

2.3 内存屏障:防止指令重排#

现代 CPU 和编译器都会对指令进行重排序以提升性能,但重排序可能破坏同步逻辑的正确性。内存屏障(Memory Barrier)用于约束重排序:

屏障作用典型用途
smp_mb()全屏障,禁止前后所有读写重排通用同步
smp_rmb()读屏障,禁止前面的读与后面的读重排读取端同步
smp_wmb()写屏障,禁止前面的写与后面的写重排写入端同步
smp_store_release()Release 语义写消息发布
smp_load_acquire()Acquire 语义读消息消费

一个经典的例子是生产者-消费者模式:

int data = 0;
bool ready = false;
// 生产者(CPU 0)
void producer(void) {
data = 42; // 写入数据
smp_store_release(&ready, true); // Release 写:保证 data 的写入在 ready 之前可见
}
// 消费者(CPU 1)
void consumer(void) {
if (smp_load_acquire(&ready)) { // Acquire 读:保证读取 data 在 ready 之后
assert(data == 42); // 保证成立
}
}
Important

smp_* 屏障只在 SMP 系统上生效,UP(单核)系统上编译为空操作。如果需要在中断与进程之间保证顺序(即使在 UP 上),应使用 mb()/rmb()/wmb() 等无条件屏障。

三、自旋锁(Spinlock)#

3.1 忙等锁的基本原理#

自旋锁是最基本的内核锁——当锁被持有时,请求者不会睡眠,而是在一个紧凑循环中不断检查锁状态(“自旋”):

kernel/locking/spinlock.c
// 自旋锁的核心逻辑(简化)
void __raw_spin_lock(raw_spinlock_t *lock)
{
// 1. 关闭内核抢占
preempt_disable();
// 2. 死循环等待锁释放
while (atomic_xchg(&lock->slock, 1) != 0)
cpu_relax(); // PAUSE 指令,降低功耗并提示超线程
}

自旋锁的关键约束

  1. 不能睡眠:持锁期间绝对不能调用任何可能睡眠的函数(kmalloc(GFP_KERNEL)copy_from_usermutex_lock 等),因为睡眠会导致其他 CPU 无限期自旋
  2. 持锁时间必须短:自旋期间 CPU 在空转,持锁时间越长浪费越多
  3. 可用于中断上下文:因为不睡眠,是中断上下文中唯一可用的锁类型

3.2 spin_lock 的变体#

不同场景需要不同的自旋锁变体,核心区别在于是否需要禁用中断

变体禁用抢占禁用中断适用场景
spin_lock()进程上下文,共享数据不被中断访问
spin_lock_irq()进程上下文,共享数据被中断访问
spin_lock_irqsave()(保存/恢复中断状态)不确定当前中断是否已禁用
spin_lock_bh()禁用软中断共享数据被软中断访问
// 进程上下文与中断共享数据的正确用法
spinlock_t my_lock;
int shared_counter;
// 进程上下文
void process_side(void) {
unsigned long flags;
spin_lock_irqsave(&my_lock, flags); // 保存中断状态并禁用中断
shared_counter++;
spin_unlock_irqrestore(&my_lock, flags); // 恢复中断状态
}
// 中断上下文
irqreturn_t my_interrupt(int irq, void *dev_id) {
spin_lock(&my_lock); // 中断中中断已自动禁用,无需再禁
shared_counter++;
spin_unlock(&my_lock);
return IRQ_HANDLED;
}
Warning

为什么不统一用 spin_lock_irqsave()?因为禁用中断是有代价的——它会延迟系统中所有中断的响应。在不需要禁用中断的场景下使用 spin_lock(),可以让中断处理程序继续响应,提升系统实时性。

3.3 Ticket Lock 与 MCS Lock#

Linux 自旋锁的实现经历了重要演进:

  • Ticket Lock(2.6.x 起):解决了”饥饿”问题。每个锁请求获得一个递增的 ticket 号,锁释放时 head 递增,只有 ticket == head 的等待者才能获取锁。保证了 FIFO 公平性。

  • MCS Lock(较新内核):解决了”惊群”问题。Ticket Lock 释放时所有自旋者都会竞争缓存行,造成总线流量风暴。MCS Lock 让每个等待者在自己的本地变量上自旋,释放时只通知下一个等待者。

// include/linux/mcs_spinlock.h(简化)
struct mcs_spinlock {
struct mcs_spinlock *next; // 指向下一个等待者
int locked; // 本地自旋变量:1=需要等待,0=获得锁
};

四、互斥锁(Mutex)#

4.1 可睡眠的重量级锁#

互斥锁(Mutex)是内核中最常用的可睡眠锁。当锁被持有时,请求者不会忙等,而是将自己标记为 TASK_UNINTERRUPTIBLE 状态并让出 CPU:

kernel/locking/mutex.c
void __sched mutex_lock(struct mutex *lock)
{
// 快速路径:无争用时直接获取
if (atomic_long_try_cmpxchg_acquire(&lock->owner, 0, current))
return;
// 慢速路径:加入等待队列,睡眠等待
__mutex_lock_slowpath(lock);
}

互斥锁与自旋锁的核心区别:

特性自旋锁互斥锁
等待方式忙等(CPU 空转)睡眠(让出 CPU)
可否睡眠持锁时不可睡眠持锁时可以睡眠
上下文限制进程 + 中断上下文均可进程上下文
持锁时间必须极短可以较长
开销(无争用)极低(一条原子指令)较低(cmpxchg + 分支)
开销(有争用)CPU 空转浪费上下文切换开销

4.2 Mutex 的乐观自旋#

一个精妙的优化:即使互斥锁是”可睡眠锁”,在持锁者正在运行时,等待者也会短暂自旋——因为持锁者可能很快释放锁,此时自旋比睡眠-唤醒的上下文切换代价更低:

// kernel/locking/mutex.c(简化)
static bool __mutex_optimistic_spin(struct mutex *lock,
struct mutex_waiter *waiter)
{
// 当持锁者正在当前 CPU 上执行时,自旋等待
while (true) {
// 尝试获取锁
if (atomic_long_try_cmpxchg_acquire(&lock->owner, owner, current))
return true;
// 持锁者不在运行?停止自旋,去睡眠
if (!owner_on_cpu(owner))
return false;
cpu_relax();
}
}

这种”乐观自旋”策略在低争用场景下显著提升了互斥锁的性能——它融合了自旋锁的低延迟和互斥锁的不浪费 CPU 的优点。

4.3 Mutex 的调试支持#

内核提供了 CONFIG_DEBUG_MUTEXES 选项,启用后会检测:

  • 重复加锁(同一任务对同一 mutex 加锁两次)
  • 非持锁者解锁
  • 持锁时任务退出(未释放锁)
  • 中断上下文中使用 mutex

五、读写锁与顺序锁#

5.1 读写自旋锁(rwlock)#

读写锁区分读访问和写访问:多个读者可以并行持有读锁,但写锁是排他的。适用于读多写少的场景:

// 定义与使用
rwlock_t my_rwlock;
// 读端
read_lock(&my_rwlock);
// 读取共享数据——多个读者可并行
read_unlock(&my_rwlock);
// 写端
write_lock(&my_rwlock);
// 修改共享数据——排他访问
write_unlock(&my_rwlock);

读写自旋锁的局限性:写者可能”饿死”——如果读者源源不断,写者永远无法获取写锁。因此 Linux 内核中读写锁的使用正在减少,更多场景转向 RCU 或顺序锁。

5.2 顺序锁(Seqlock)#

顺序锁采用了一种截然不同的思路:读者不加锁,写者用序列号保护

include/linux/seqlock.h
typedef struct {
unsigned sequence; // 序列号,偶数=无写者,奇数=正在写入
spinlock_t lock; // 保护写者之间互斥
} seqlock_t;
// 读者(无锁)
unsigned int seq;
do {
seq = read_seqbegin(&my_seqlock); // 读取序列号
// 读取共享数据...
} while (read_seqretry(&my_seqlock, seq)); // 检查序列号是否变化
// 写者
write_seqlock(&my_seqlock); // 序列号++(奇数),获取自旋锁
// 修改共享数据...
write_sequnlock(&my_seqlock); // 序列号++(偶数),释放自旋锁

顺序锁的工作原理:

  1. 读者先读取序列号,然后读取数据,最后再检查序列号
  2. 如果两次序列号不同(或为奇数),说明读取期间有写入,需要重试
  3. 写者通过自旋锁互斥,通过递增序列号通知读者

顺序锁的适用条件

  • 读者远多于写者
  • 共享数据是简单类型(指针、整数等),可以原子读取
  • 写者不能在读者持有引用时修改数据结构(否则需要 RCU)

顺序锁在内核中的典型应用:时间读取(do_gettimeofday())、/proc 统计信息等。

六、RCU:读无锁的终极方案#

6.1 RCU 的核心思想#

RCU(Read-Copy-Update)是 Linux 内核中最精巧的同步机制,它实现了读者零开销——读者不需要获取任何锁,不需要原子操作,甚至不需要内存屏障:

// 读者——完全无锁、无屏障
rcu_read_lock(); // 仅标记进入读端临界区(preempt_disable)
p = rcu_dereference(gp); // 读取指针(带数据依赖屏障)
// 使用 p 指向的数据...
rcu_read_unlock(); // 标记退出(preempt_enable)
// 写者——创建副本,修改副本,替换指针
new = kmalloc(sizeof(*old), GFP_KERNEL);
*new = *old; // 复制旧数据
new->field = new_value; // 修改副本
rcu_assign_pointer(gp, new); // 原子替换指针
synchronize_rcu(); // 等待所有读者退出
kfree(old); // 安全释放旧数据
sequenceDiagram participant R1 as 读者1 participant R2 as 读者2 participant GP as 全局指针 gp participant W as 写者 R1->>GP: rcu_dereference(gp) → 旧数据 R2->>GP: rcu_dereference(gp) → 旧数据 W->>W: 复制旧数据 → new 副本 W->>W: 修改 new 副本 W->>GP: rcu_assign_pointer(gp, new) Note over R1,R2: 读者1、2 仍持有旧数据引用 R1->>R1: rcu_read_unlock() 退出 R2->>R2: rcu_read_unlock() 退出 W->>W: synchronize_rcu() 宽限期结束 W->>W: kfree(old) 安全释放旧数据

6.2 宽限期(Grace Period)#

RCU 的核心概念是宽限期:从写者替换指针到所有旧读者退出的时间窗口。只有宽限期结束后,旧数据才能被安全释放。

时间轴:
|----读者A进入----读者A退出----|
|----读者B进入----读者B退出----|
|----读者C进入---------读者C退出----|
^写者替换指针
|<---------- 宽限期 ---------->|
^所有旧读者已退出
^可安全释放旧数据

synchronize_rcu() 的实现非常巧妙——它不需要逐个追踪读者,而是利用了以下观察:在宽限期开始后,所有 CPU 都至少经历了一次上下文切换(或经过了一个静默期),就意味着所有之前的读者都已退出

// kernel/rcu/tree.c(简化)
void synchronize_rcu(void)
{
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map),
"Illegal synchronize_rcu() in RCU read-side critical section");
if (rcu_blocking_is_gp())
return; // UP 系统,直接返回
// 等待宽限期
wait_rcu_gp(call_rcu);
}

6.3 rcu_read_lock / rcu_read_unlock 的本质#

在可抢占内核中,rcu_read_lock() 本质上是 preempt_disable()——禁用抢占保证了读端临界区不会跨越上下文切换,从而让宽限期检测有明确的”静默期”判断依据。

在不可抢占内核中,rcu_read_lock() 编译为空操作——因为不可抢占内核中上下文切换只发生在显式的调度点,读者天然不会在临界区内被切换。

6.4 call_rcu:异步回调#

synchronize_rcu() 是同步等待,会阻塞调用者。对于性能敏感路径,可以使用异步版本:

// 注册回调,宽限期结束后自动调用
struct foo {
int a;
struct rcu_head rcu; // RCU 回调头——必须作为结构体成员
};
static void foo_reclaim(struct rcu_head *rh)
{
struct foo *p = container_of(rh, struct foo, rcu);
kfree(p); // 宽限期结束后安全释放
}
// 写者
void foo_update(struct foo **global_ptr, int new_a)
{
struct foo *new = kmalloc(sizeof(*new), GFP_KERNEL);
struct foo *old = *global_ptr;
*new = *old;
new->a = new_a;
rcu_assign_pointer(*global_ptr, new);
call_rcu(&old->rcu, foo_reclaim); // 异步释放
}

6.5 RCU 的变体#

变体读端开销宽限期延迟适用场景
RCU(经典)preempt_disable较长通用内核数据结构
SRCU(可睡眠)可睡眠读端显式 srcu_read_lock 返回值需要在读端睡眠
RCU Tasks较长追踪 BPF 程序等
Tiny RCU极低极短UP 小系统
Note

RCU 并非万能药。它的适用前提是:指针间接访问——读者通过指针访问数据,写者修改副本后替换指针。对于需要原地修改的场景(如计数器),RCU 不适用,应使用原子操作或 percpu 变量。

七、Futex:用户态快速互斥锁#

7.1 Futex 的设计动机#

传统 System V 信号量在无争用时也需要系统调用,开销巨大(每次操作约 1-5μs)。Futex(Fast Userspace Mutex)的核心思想是:无争用时完全在用户态完成,只在需要等待/唤醒时才进入内核

// 用户态 pthread_mutex_lock 的简化实现
void futex_lock(atomic_t *futex) {
// 快速路径:无争用,纯用户态
if (atomic_cmpxchg(futex, 0, 1) == 0)
return; // 获取成功,零系统调用
// 慢速路径:有争用,进入内核等待
while (atomic_xchg(futex, 2) != 0)
futex_wait(futex, 2); // 系统调用:在内核等待
}
void futex_unlock(atomic_t *futex) {
int old = atomic_xchg(futex, 0);
if (old == 2)
futex_wake(futex, 1); // 系统调用:唤醒一个等待者
}

futex 值的三个状态:

含义
0锁空闲
1锁被持有,无等待者
2锁被持有,有等待者

7.2 内核实现#

Futex 的内核实现位于 kernel/futex/

kernel/futex/
├── core.c # :哈希表、等待队列
├── pi.c # 优先级继承(Priority Inheritance)
├── requeue.c # futex_requeue 操作
├── waitwake.c # futex_wait / futex_wake 实现
└── futex.h # 内部头文件

内核为每个 futex 字(由用户态地址标识)维护一个等待队列,通过哈希表索引:

// kernel/futex/core.c(简化)
static struct futex_hash_bucket *futex_hash(union futex_key *key)
{
u32 hash = hash_futex(key);
return &futex_queues[hash & (futex_hashsize - 1)];
}
// futex_wait 的核心逻辑
int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
// 1. 验证 *uaddr == val(防止丢失唤醒)
// 2. 将当前任务加入哈希桶等待队列
// 3. 设置 TASK_INTERRUPTIBLE 并调度
// 4. 被唤醒后从等待队列移除
}

7.3 优先级继承(PI Futex)#

优先级反转是实时系统的噩梦:低优先级任务持有锁,中优先级任务抢占运行,高优先级任务无法获取锁。PI Futex 通过优先级继承解决此问题——当高优先级任务等待低优先级任务持有的锁时,低优先级任务临时继承高优先级:

// 使用 PI futex
pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT);

内核在 kernel/futex/pi.c 中实现了完整的优先级继承链追踪,支持嵌套锁的优先级传递。

八、完成量与信号量#

8.1 完成量(Completion)#

完成量是内核中”事件通知”的轻量机制——一个任务等待某事件完成,另一个任务完成后通知:

// 声明与初始化
struct completion my_comp;
init_completion(&my_comp);
// 等待方
wait_for_completion(&my_comp); // 不可中断等待
wait_for_completion_interruptible(&my_comp); // 可中断等待
wait_for_completion_timeout(&my_comp, timeout); // 超时等待
// 完成方
complete(&my_comp); // 唤醒一个等待者
complete_all(&my_comp); // 唤醒所有等待者

完成量的典型应用:驱动初始化等待硬件就绪、内核线程启动同步等。与信号量不同,完成量是一次性的——complete() 后不能”重置”,语义更清晰。

8.2 内核信号量(Semaphore)#

内核信号量是传统的计数信号量,允许指定初始计数:

// 定义与初始化
struct semaphore my_sem;
sema_init(&my_sem, 5); // 初始计数为 5,允许 5 个并发持有者
// 获取
down(&my_sem); // 不可中断
down_interruptible(&my_sem); // 可中断
down_trylock(&my_sem); // 非阻塞
// 释放
up(&my_sem);

内核信号量在现代内核中的使用正在减少——mutex 更高效、调试支持更好,completion 语义更清晰。信号量主要用于需要计数语义的场景(如连接池限制并发数)。

九、lockdep:死锁检测#

9.1 死锁的四种类型#

类型描述示例
AA 死锁同一任务对同一锁重复加锁spin_lock(&A); spin_lock(&A);
ABBA 死锁两个任务以不同顺序获取两把锁任务 1: lock(A)->lock(B); 任务 2: lock(B)->lock(A)
AB-BC-CA 死锁三个任务形成锁序环三把锁形成循环依赖
中断与进程死锁进程持锁时被中断,中断又请求同一锁进程 spin_lock(&A) → 中断 spin_lock(&A)

9.2 lockdep 的工作原理#

lockdep(Lock Dependency Validator)在内核启动时通过 CONFIG_LOCKDEP 启用,它维护两张图:

  1. 锁序图:记录所有锁的获取顺序,检测是否存在环(ABBA 死锁)
  2. 中断安全图:记录哪些锁在中断上下文中使用,检测中断-进程死锁
// kernel/locking/lockdep.c(简化逻辑)
// 每次加锁时:
void lock_acquire(struct lockdep_map *lock, unsigned int subclass,
int trylock, int read, int check,
struct lockdep_map *nest_lock, unsigned long ip)
{
// 1. 检查当前任务已持有的锁链中是否已包含此锁(AA 死锁)
// 2. 检查新的锁序是否与已有锁序形成环(ABBA 死锁)
// 3. 检查中断上下文使用是否合规
// 4. 将新的锁序关系加入锁序图
}

9.3 使用 lockdep#

# 启用 lockdep
echo 1 > /proc/sys/kernel/lockdep
# 查看锁统计
cat /proc/lockdep_stats
# 查看锁依赖链
cat /proc/lockdep_chains
# 查看最常争用的锁
cat /proc/lockdep_stats | grep contentions

lockdep 的典型输出:

======================================================
WARNING: possible circular locking dependency detected
------------------------------------------------------
task/1234 is trying to acquire lock:
(&my_lock#2){+.+.}, at: my_function+0x42/0x100
but task is already holding lock:
(&other_lock){+.+.}, at: other_function+0x1a/0x80
which lock already depends on the new lock.
Important

lockdep 在检测到第一个锁序违规后会关闭自身,避免产生过多输出。这意味着它不能替代代码审查——只能作为辅助工具。此外,lockdep 有运行时开销(约 5-10%),生产环境通常不启用。

十、percpu 变量#

10.1 每个CPU一份副本#

percpu 变量是避免锁争用的终极手段——每个 CPU 拥有变量的独立副本,访问自己的副本不需要任何同步:

// 定义 percpu 变量
DEFINE_PER_CPU(int, my_counter);
// 静态定义
static DEFINE_PER_CPU(struct my_struct, my_data);
// 动态分配
int __percpu *ptr = alloc_percpu(int);
// 访问(需禁用抢占)
void increment_counter(void) {
preempt_disable(); // 禁用抢占,防止访问期间被迁移到其他 CPU
this_cpu_inc(my_counter); // 原子地递增当前 CPU 的副本
preempt_enable();
}
// 安全访问宏(内置抢占保护)
this_cpu_inc(my_counter); // 等价于 preempt_disable + inc + preempt_enable
this_cpu_add(my_counter, val); // 加值
this_cpu_read(my_counter); // 读取

10.2 percpu 的内存布局#

percpu 区域在内存中有精心的布局:

percpu 内存布局(SMP 系统):
┌──────────────────────────────────────────────────────┐
│ CPU 0 区域 │ CPU 1 区域 │ CPU 2 区域 │ ... │
├───────────────┼───────────────┼───────────────┼─────┤
│ my_counter │ my_counter │ my_counter │ │
│ my_data │ my_data │ my_data │ │
│ ... │ ... │ ... │ │
└───────────────┴───────────────┴───────────────┴─────┘

每个 CPU 的副本在缓存行上对齐,避免了伪共享(False Sharing)——不同 CPU 修改变量的不同字段不会导致缓存行乒乓。

10.3 percpu 的典型应用#

percpu 变量在内核中无处不在:

  • 进程统计per_cpu(process_counts, cpu) — 每个 CPU 独立计数
  • Slub 分配器:每个 CPU 的本地缓存(struct kmem_cache_cpu
  • 网络统计SNMP_STATS — 每个 CPU 独立统计,读取时求和
  • RCUrcu_data — 每个 CPU 的宽限期检测状态
// 统计所有 CPU 的计数器总和
int total = 0;
int cpu;
for_each_possible_cpu(cpu)
total += per_cpu(my_counter, cpu);
Note

percpu 变量的”求和”操作本身不是原子的——在遍历期间其他 CPU 可能正在修改各自的副本。对于需要精确一致性的场景,仍需使用锁或原子操作。percpu 的优势在于高频写入无争用,而非读取时的一致性。

十一、同步原语选择指南#

flowchart TD Start["需要保护共享数据"] --> Q1{"中断上下文<br/>是否访问?"} Q1 -->|是| Q2{"持锁时间极短?"} Q2 -->|是| Spin["spin_lock_irqsave()<br/>自旋锁 + 禁用中断"] Q2 -->|否| Rethink["重新设计:<br/>将工作推迟到进程上下文"] Q1 -->|否| Q3{"读写比例?"} Q3 -->|读多写少| Q4{"读者能否容忍<br/>偶尔重试?"} Q4 -->|是| RCU["RCU<br/>读无锁"] Q4 -->|否| RW["读写锁 / 顺序锁"] Q3 -->|读写均衡| Q5{"持锁时间?"} Q5 -->|极短| Atomic["原子操作<br/>atomic_t / cmpxchg"] Q5 -->|较短| SpinN["spin_lock()<br/>自旋锁"] Q5 -->|较长| Mutex["mutex<br/>互斥锁"] Q3 -->|仅写入| Q6{"简单计数器?"} Q6 -->|是| Percpu["percpu 变量"] Q6 -->|否| Mutex2["mutex"] style RCU fill:#4CAF50,color:#fff style Spin fill:#FF9800,color:#fff style Mutex fill:#2196F3,color:#fff style Percpu fill:#9C27B0,color:#fff

十二、动手实践#

实验 1:观察内核锁争用#

# 1. 使用 perf 统计锁争用
sudo perf lock record sleep 10
sudo perf lock report
# 2. 查看特定锁的争用情况
sudo perf lock record -g -- sleep 5
sudo perf lock report -n
# 3. 使用 BPF 追踪 mutex 锁持锁时间
sudo bpftrace -e '
kprobe:mutex_lock { @start[tid] = nsecs; }
kretprobe:mutex_lock /@start[tid]/ {
@ns[func] = hist(nsecs - @start[tid]);
delete(@start[tid]);
}'

实验 2:验证原子操作与竞态条件#

// race_test.c — 演示竞态条件
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/atomic.h>
static int unsafe_counter = 0;
static atomic_t safe_counter = ATOMIC_INIT(0);
static int thread_fn(void *data)
{
int i;
for (i = 0; i < 1000000; i++) {
unsafe_counter++; // 不安全!
atomic_inc(&safe_counter); // 安全
}
return 0;
}
static int __init race_init(void)
{
struct task_struct *t1, *t2;
t1 = kthread_run(thread_fn, NULL, "race_thread1");
t2 = kthread_run(thread_fn, NULL, "race_thread2");
msleep(3000); // 等待线程完成
pr_info("unsafe_counter = %d (expected 2000000)\n", unsafe_counter);
pr_info("safe_counter = %d (expected 2000000)\n", atomic_read(&safe_counter));
return 0;
}
module_init(race_init);
MODULE_LICENSE("GPL");

实验 3:使用 lockdep 检测死锁#

# 1. 启用 lockdep
echo 1 > /proc/sys/kernel/lockdep
# 2. 查看锁依赖统计
cat /proc/lockdep_stats
# 3. 查看最常争用的前 20 把锁
cat /proc/lockdep_stats | head -20
# 4. 在 dmesg 中查看 lockdep 警告
dmesg | grep "lockdep"

实验 4:观察 RCU 宽限期#

# 1. 查看 RCU 统计信息
cat /proc/rcu/rcuexp
cat /proc/rcu/rcugp
# 2. 追踪 synchronize_rcu 调用
sudo bpftrace -e '
kprobe:synchronize_rcu {
printf("synchronize_rcu called from %s\n", kstack);
}'
# 3. 查看 RCU 回调队列长度
cat /proc/rcu/rcucb

实验 5:percpu 变量实践#

# 1. 查看 percpu 统计信息
cat /proc/zoneinfo | grep "percpu"
# 2. 使用 BPF 观察每个 CPU 的中断计数
sudo bpftrace -e '
kprobe:handle_irq {
@irq_count[cpu] = count();
}'

参考资料#

内核源码#

路径内容
kernel/locking/spinlock.c自旋锁实现
kernel/locking/mutex.c互斥锁实现
kernel/locking/lockdep.c死锁检测
kernel/rcu/tree.cTree RCU 实现
kernel/rcu/tree_plugin.h可抢占 RCU 扩展
kernel/rcu/srcutree.cSRCU 实现
kernel/futex/core.cFutex 核心实现
kernel/futex/pi.cPI Futex 优先级继承
include/linux/spinlock.h自旋锁接口定义
include/linux/mutex.h互斥锁接口定义
include/linux/rcupdate.hRCU 接口定义
include/linux/percpu.hpercpu 变量接口
include/linux/seqlock.h顺序锁接口
include/linux/completion.h完成量接口

经典文档与书籍#

  • 《Is Parallel Programming Hard, And, If So, What Can You Do About It?》 — Paul E. McKenney(RCU 作者),对内核同步机制最权威的阐述
  • 《Linux 内核设计与实现》 第 7 章 — Robert Love,同步原语概述
  • 《深入理解 Linux 内核》 第 5 章 — Bovet & Cesati,内核同步详解
  • Documentation/RCU/whatisRCU.rst — 内核源码中的 RCU 入门文档
  • Documentation/locking/lockdep-design.rst — lockdep 设计文档
  • Documentation/locking/futex.rst — Futex 设计文档
  • Futex 原论文 — Hubertus Franke 等人,2002 OLS

在线资源#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

内核同步机制
https://blog.souloss.com/posts/linux-internals/kernel-synchronization/
作者
Souloss
发布于
2024-10-19
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时