mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2176 字
6 分钟
同步原语:自旋锁、互斥锁与信号量
2021-08-02

有了多任务和中断,就出现了并发问题。两个任务同时修改同一块数据,结果取决于谁先执行——这就是竞态条件(race condition)。竞态条件让程序行为变得不可预测,是并发编程中最危险的陷阱。

并发问题与竞态条件#

在多任务环境下,当多个任务同时访问共享变量时,可能会出现竞态条件。看一个简单的例子:

// 共享变量
int counter = 0;
// 任务A和任务B都执行这段代码
void increment(void) {
counter++; // 这不是原子操作!
}

counter++ 看起来很简单,但编译后实际上包含三步:

  1. 读取 counter 到寄存器
  2. 寄存器值 +1
  3. 写回 counter

如果两个任务同时执行,可能出现:

时间 | 任务A | 任务B
-----|-------------------|-------------------
T1 | 读取 counter=0 |
T2 | | 读取 counter=0
T3 | 寄存器+1 (=1) |
T4 | | 寄存器+1 (=1)
T5 | 写回 counter=1 |
T6 | | 写回 counter=1

预期结果应该是 2,但实际结果是 1!这就是竞态条件。为了解决这个问题,需要使用同步原语来保护临界区。

竞态条件的根源是多个执行流同时访问共享数据。最直接的解决方案是让一个执行流在访问时独占——这就是锁的概念。自旋锁是最简单的锁:如果锁已被占用,就不断循环检查(自旋),直到锁释放。

自旋锁(Spinlock)#

自旋锁用于保护非常短的临界区,特别是在中断上下文或不能睡眠的场景中。它通过原子交换操作实现锁的获取,如果锁被占用,获取者会循环检查直到锁可用。自旋锁是一种忙等待锁,其工作流程如下:

flowchart TD A[尝试获取自旋锁] --> B{锁是否空闲?} B -->|是| C[原子交换设置锁] B -->|否| D[循环自旋等待] D --> B C --> E[进入临界区] E --> F[执行临界区代码] F --> G[释放锁] G --> H[设置锁为空闲] H --> I[离开临界区]
typedef struct
{
volatile uint32_t lock; // 0: 未锁定, 1: 锁定
volatile uint32_t interrupt_mask; // 保存中断状态(用于中断安全版本)
} spinlock_t;
#define LOCKED_YES 1
#define LOCKED_NO 0

自旋锁的核心实现使用了 atomic_exchange 原子操作:

/**
* spinlock_lock - 纯线程间抢锁(不能在 ISR 里使用)
* @splock: 锁对象
*
* 使用原子交换操作实现忙等待锁获取
*/
void spinlock_lock(spinlock_t *splock)
{
/* 如果锁已被占用,会一直空转直到持有者释放 */
while (atomic_exchange(&splock->lock, LOCKED_YES) != LOCKED_NO)
{
/* 单核空转时不需要 pause,因为不会有第二个核释放锁 */
}
}
/**
* spinlock_unlock - 释放锁(纯线程版本)
* @splock: 锁对象
*
* 单核下可以直接赋值,因为关中断后唯一执行流就是当前代码
*/
void spinlock_unlock(spinlock_t *splock)
{
splock->lock = LOCKED_NO;
}
  • atomic_exchange 是一个原子操作,它原子地将新值写入目标地址并返回旧值
  • 如果返回值是 LOCKED_NO(0),说明锁之前是空闲的,我们成功获取了锁
  • 如果返回值是 LOCKED_YES(1),说明锁已被占用,继续循环等待
  • 释放锁时只需将 lock 字段设置为 LOCKED_NO

在可能被中断处理程序访问的临界区中,需要使用带中断保护的版本:

/**
* spinlock_lock_irqsave - 带"关中断"功能的抢锁
* @splock: 锁对象
*
* 执行步骤:
* 1. 保存当前 EFLAGS 的 IF 位
* 2. 执行 cli 关闭本地中断
* 3. 用原子交换抢锁
* 4. 解锁时根据 interrupt_mask 决定是否重新 sti
*/
void spinlock_lock_irqsave(spinlock_t *splock)
{
uint32_t eflags = get_eflags(); /* 读取进入前的完整标志寄存器 */
disable_interrupts(); /* cli:本地 CPU 不再响应 IRQ */
splock->interrupt_mask = (eflags & (1 << 9)); /* 只保留 IF 位 */
/* 下面与 spinlock_lock 完全相同 */
while (atomic_exchange(&splock->lock, LOCKED_YES) != LOCKED_NO)
{
}
}
/**
* spinlock_unlock_irqrestore - 释放锁并恢复中断状态
* @splock: 锁对象
*
* 恢复顺序:先放锁 → 再恢复中断,防止在临界区里被中断插进来
*/
void spinlock_unlock_irqrestore(spinlock_t *splock)
{
splock->lock = LOCKED_NO; /* 放锁 */
if (splock->interrupt_mask)
{ /* 如果进锁前 IF==1,才重新开中断 */
enable_interrupts(); /* sti */
}
}
  • 在获取锁前先保存中断状态并关闭中断
  • 防止在临界区内被中断打断而导致死锁
  • 释放锁时根据之前保存的中断状态决定是否重新开中断
  • 保证”谁关谁开”,不会误把外层已关闭的中断重新打开

自旋锁适用于短临界区(持有锁时间非常短,自旋等待的开销小于任务切换)、中断上下文(不能睡眠的环境,中断处理程序不能调用 schedule)以及多核系统(自旋等待比睡眠更高效,因为可能很快在其他核上释放)。

自旋锁简单粗暴,但有一个问题:如果等待时间较长,自旋会浪费 CPU 时间。互斥锁改进了这一点——当锁不可用时,当前任务主动让出 CPU,进入阻塞状态,等锁释放时再被唤醒。

互斥锁(Mutex)#

互斥锁在锁被占用时会让任务睡眠,而不是自旋等待。这对于持锁时间较长的场景更高效,可以避免 CPU 资源的浪费。互斥锁使用等待队列来管理阻塞的任务:

flowchart TD A[尝试获取互斥锁] --> B{锁是否空闲?} B -->|是| C[原子交换设置锁] B -->|否| D[加入等待队列] C --> E[获取锁成功] D --> F[设置任务状态为 BLOCKED] F --> G[调用 schedule 切换任务] E --> H[进入临界区] H --> I[执行临界区代码] I --> J[释放锁] J --> K{有等待任务?} K -->|是| L[唤醒第一个等待者] K -->|否| M[设置锁为空闲] L --> N[锁转移给等待者] M --> O[离开临界区]
typedef struct mutex
{
volatile uint32_t hold; // 0: 未锁定, 1: 锁定
volatile struct linked_list_node *thread_node; // 持有者(未来可用于调试)
linked_list_t waiting_task_queue; // 等待队列
yieldlock_t ydlock; // 保护内部状态的锁
} mutex_t;

互斥锁的实现使用了 yieldlock(另一种锁机制)来保护内部状态:

/**
* mutex_lock - 获取互斥锁
* @mp: 互斥锁对象
*
* 如果锁被占用,将当前任务加入等待队列并阻塞
*/
void mutex_lock(mutex_t *mp)
{
while (1) {
yieldlock_lock(&mp->ydlock);
if (atomic_exchange(&mp->hold, LOCKED_YES) == LOCKED_NO) {
/* 成功获取锁 */
yieldlock_unlock(&mp->ydlock);
return;
}
/* 获取失败:将当前任务加入等待队列并阻塞 */
task_t *self = current_task();
linked_list_node_t *node = (linked_list_node_t *)kmalloc(sizeof(linked_list_node_t));
if (node != NULL) {
node->ptr = (type_t)self;
node->prev = NULL;
node->next = NULL;
linked_list_append(&mp->waiting_task_queue, node);
}
yieldlock_unlock(&mp->ydlock);
/* 阻塞当前任务,schedule() 会切换到其他任务 */
task_block(self);
/* 被唤醒后重新尝试获取锁 */
}
}
/**
* mutex_unlock - 释放互斥锁
* @mp: 互斥锁对象
*
* 如果有等待者,唤醒队列中的第一个
*/
void mutex_unlock(mutex_t *mp)
{
yieldlock_lock(&mp->ydlock);
mp->hold = LOCKED_NO;
mp->thread_node = NULL;
/* 如果有等待者,唤醒第一个 */
if (mp->waiting_task_queue.size != 0) {
linked_list_node_t *head = mp->waiting_task_queue.head;
task_t *waiter = (task_t *)head->ptr;
linked_list_remove(&mp->waiting_task_queue, head);
kfree(head);
yieldlock_unlock(&mp->ydlock);
task_unblock(waiter);
} else {
yieldlock_unlock(&mp->ydlock);
}
}
  • mutex_lock 使用 atomic_exchange 尝试获取锁
  • 如果锁已被占用,将当前任务加入等待队列并调用 task_block 阻塞
  • 阻塞后的任务会被调度器切换出去,不会占用 CPU 资源
  • mutex_unlock 检查是否有等待者,如果有则唤醒第一个等待者
  • 注意:锁会直接转移给等待者,而不是释放锁让等待者重新竞争

互斥锁只能保护互斥访问(0或1),但有时需要更灵活的同步——比如限制同时访问的资源数量。信号量通过计数器实现了这种能力。

信号量(Semaphore)#

信号量是一个计数器,用于控制对资源的访问数量。它不仅可以用于互斥访问(信号量初始值为 1),还可以用于资源池管理(信号量初始值大于 1)。信号量通过 P 操作(wait)和 V 操作(signal)来管理资源:

flowchart TD subgraph P操作["P操作 (sem_wait)"] A1[尝试获取资源] --> B1{count > 0?} B1 -->|是| C1[count--] B1 -->|否| D1[加入等待队列] C1 --> E1[获取资源成功] D1 --> F1[阻塞当前任务] end subgraph V操作["V操作 (sem_signal)"] A2[释放资源] --> B2{有等待者?} B2 -->|是| C2[唤醒第一个等待者] B2 -->|否| D2[count++] C2 --> E2[等待者获取资源] D2 --> F2[增加可用资源] end
typedef struct semaphore {
volatile int count; // 可用资源数量
linked_list_t wait_queue; // 等待队列
spinlock_t lock; // 保护内部状态
} semaphore_t;

信号量的实现清晰地展示了 P/V 操作的语义:

/**
* sem_init - 初始化信号量
* @sem: 信号量对象
* @initial_count: 初始资源数量
*/
void sem_init(semaphore_t *sem, int initial_count)
{
sem->count = initial_count;
linked_list_init(&sem->wait_queue);
spinlock_init(&sem->lock);
}
/**
* sem_wait - P 操作:等待/获取资源
* @sem: 信号量对象
*
* 如果有资源可用,递减计数;否则阻塞当前任务
*/
void sem_wait(semaphore_t *sem)
{
spinlock_acquire(&sem->lock);
if (sem->count > 0) {
/* 有资源可用,直接获取 */
sem->count--;
spinlock_release(&sem->lock);
return;
}
/* count == 0:将当前任务加入等待队列并阻塞 */
task_t *self = current_task();
linked_list_node_t *node = (linked_list_node_t *)kmalloc(sizeof(linked_list_node_t));
if (node != NULL) {
node->ptr = (type_t)self;
node->prev = NULL;
node->next = NULL;
linked_list_append(&sem->wait_queue, node);
}
spinlock_release(&sem->lock);
task_block(self);
}
/**
* sem_signal - V 操作:释放/增加资源
* @sem: 信号量对象
*
* 如果有等待者,唤醒一个;否则递增计数
*/
void sem_signal(semaphore_t *sem)
{
spinlock_acquire(&sem->lock);
if (sem->wait_queue.size > 0) {
/* 有等待者,唤醒第一个 */
linked_list_node_t *head = sem->wait_queue.head;
task_t *waiter = (task_t *)head->ptr;
linked_list_remove(&sem->wait_queue, head);
kfree(head);
spinlock_release(&sem->lock);
task_unblock(waiter);
} else {
/* 没有等待者,增加资源计数 */
sem->count++;
spinlock_release(&sem->lock);
}
}
  • sem_wait(P 操作)首先检查 count 是否大于 0
  • 如果有资源可用,递减计数并立即返回
  • 如果没有资源,将当前任务加入等待队列并阻塞
  • sem_signal(V 操作)检查是否有等待者
  • 如果有等待者,唤醒第一个等待者(不增加计数,资源直接转移)
  • 如果没有等待者,递增计数表示资源增加

经典应用:生产者-消费者#

信号量的经典应用场景是生产者-消费者问题:

semaphore_t empty; // 空槽位数量
semaphore_t full; // 已填充槽位数量
mutex_t mutex; // 保护缓冲区
// 初始化
sem_init(&empty, BUFFER_SIZE); // 初始有 BUFFER_SIZE 个空槽位
sem_init(&full, 0); // 初始没有已填充的槽位
mutex_init(&mutex);
void producer(void) {
while (1) {
item = produce_item();
sem_wait(&empty); // 等待空槽位
mutex_lock(&mutex);
buffer_put(item);
mutex_unlock(&mutex);
sem_post(&full); // 增加已填充槽位
}
}
void consumer(void) {
while (1) {
sem_wait(&full); // 等待数据
mutex_lock(&mutex);
item = buffer_get();
mutex_unlock(&mutex);
sem_post(&empty); // 增加空槽位
consume_item(item);
}
}
  • empty 信号量控制缓冲区的空槽位数量
  • full 信号量控制缓冲区的已填充槽位数量
  • 生产者等待空槽位,消费者等待已填充槽位
  • 互斥锁保护缓冲区的实际读写操作
  • 这种模式确保了生产者不会写入已满的缓冲区,消费者不会读取空的缓冲区

代码实现#

文件结构#

11.kernel-sync/
├── boot/
│ ├── mbr.S
│ └── loader.S
├── kernel/
│ ├── include/
│ │ ├── spinlock.h # 自旋锁接口定义
│ │ ├── mutex.h # 互斥锁接口定义
│ │ ├── semaphore.h # 信号量接口定义
│ │ ├── yieldlock.h # yieldlock 接口
│ │ └── ...
│ ├── sync/
│ │ ├── spinlock.c # 自旋锁实现(使用 atomic_exchange)
│ │ ├── spinlock.S # 自旋锁汇编辅助函数
│ │ ├── mutex.c # 互斥锁实现(带等待队列)
│ │ ├── semaphore.c # 信号量实现(P/V 操作)
│ │ └── yieldlock.c # yieldlock 实现
│ └── kernel.c # 主内核函数,包含测试代码
└── Makefile

自旋锁获取流程#

sequenceDiagram participant Task1 as 任务1 participant Task2 as 任务2 participant Lock as 自旋锁 Task1->>Lock: atomic_exchange(&lock, 1) Lock-->>Task1: 返回 0(成功) Note over Task1: 获取锁,进入临界区 Task2->>Lock: atomic_exchange(&lock, 1) Lock-->>Task2: 返回 1(失败) Task2->>Task2: 循环自旋等待 Task1->>Lock: lock = 0(释放锁) Note over Task1: 离开临界区 Task2->>Lock: atomic_exchange(&lock, 1) Lock-->>Task2: 返回 0(成功) Note over Task2: 获取锁,进入临界区

互斥锁阻塞唤醒流程#

sequenceDiagram participant Task1 as 任务1(持有者) participant Task2 as 任务2(等待者) participant Mutex as 互斥锁 participant Scheduler as 调度器 Task1->>Mutex: atomic_exchange(&hold, 1) Note over Task1: 获取锁,进入临界区 Task2->>Mutex: atomic_exchange(&hold, 1) Mutex-->>Task2: 返回 1(失败) Task2->>Task2: 加入等待队列 Task2->>Task2: task_block(self) Task2->>Scheduler: 切换任务 Note over Task2: 被阻塞,不占用 CPU Task1->>Mutex: 释放锁 Mutex->>Task2: task_unblock(waiter) Mutex->>Scheduler: 加入就绪队列 Note over Task1: 离开临界区 Scheduler->>Task2: 调度运行 Task2->>Mutex: 重新尝试获取锁 Note over Task2: 获取成功,进入临界区

三种同步原语的对比#

特性自旋锁(Spinlock)互斥锁(Mutex)信号量(Semaphore)
等待方式忙等待(自旋)阻塞(睡眠)阻塞(睡眠)
适用场景短临界区、中断上下文长临界区、普通任务资源计数、生产者-消费者
CPU 开销高(自旋时占用 CPU)低(阻塞时不占用 CPU)低(阻塞时不占用 CPU)
资源数1(互斥)1(互斥)N(可配置)
实现复杂度简单(原子操作)中等(等待队列)中等(计数+队列)
中断安全需要特殊版本不适用于中断不适用于中断

运行与验证#

编译运行#

cd 11.kernel-sync
make clean
make all
make run

预期输出#

Testing spinlock...
Counter without lock: 98234 (expected: 100000) <- Race condition!
Counter with spinlock: 100000 (expected: 100000) <- Correct!
Testing mutex...
Task A trying to acquire mutex...
Task A acquired mutex
Task B trying to acquire mutex...
Task A releasing mutex...
Task B acquired mutex
Task B releasing mutex...
Testing semaphore (producer-consumer)...
Producer: produced item 0
Consumer: consumed item 0
Producer: produced item 1
Consumer: consumed item 1
...
Producer: produced item 99
Consumer: consumed item 99
All tests passed!

测试要点#

  1. 自旋锁测试:验证无锁情况下的竞态条件和有锁情况下的正确性
  2. 互斥锁测试:验证任务的正确阻塞和唤醒
  3. 信号量测试:验证生产者-消费者模式的数据正确性

踩坑记录#

1. 死锁(Deadlock)#

问题原因:任务A持有锁1等待锁2,任务B持有锁2等待锁1

// 错误示例
void task_a(void) {
mutex_lock(&lock1);
mutex_lock(&lock2);
/* ... */
mutex_unlock(&lock2);
mutex_unlock(&lock1);
}
void task_b(void) {
mutex_lock(&lock2); // 锁的获取顺序与 task_a 不同!
mutex_lock(&lock1);
/* ... */
mutex_unlock(&lock1);
mutex_unlock(&lock2);
}

解决方案:统一锁的获取顺序,所有任务都以相同的顺序获取锁

// 正确示例:所有任务都先获取 lock1,再获取 lock2
void task_a(void) {
mutex_lock(&lock1);
mutex_lock(&lock2);
/* ... */
mutex_unlock(&lock2);
mutex_unlock(&lock1);
}
void task_b(void) {
mutex_lock(&lock1); // 与 task_a 相同的顺序
mutex_lock(&lock2);
/* ... */
mutex_unlock(&lock2);
mutex_unlock(&lock1);
}

2. 优先级反转(Priority Inversion)#

问题原因:低优先级任务持有锁,高优先级任务等待,中优先级任务抢占低优先级任务

解决方案:优先级继承协议,当高优先级任务等待低优先级任务持有的锁时,临时提升低优先级任务的优先级

3. 中断中的死锁#

问题原因:中断处理程序尝试获取已被任务持有的锁,导致死锁

解决方案:使用 spinlock_lock_irqsavespinlock_unlock_irqrestore,在获取锁前先关闭中断

// 错误示例
void interrupt_handler(void) {
spinlock_lock(&lock); // 如果任务A持有这个锁,会死锁!
/* ... */
spinlock_unlock(&lock);
}
// 正确示例
void interrupt_handler(void) {
spinlock_lock_irqsave(&lock); // 关中断再抢锁
/* ... */
spinlock_unlock_irqrestore(&lock); // 恢复中断状态
}

小结#

自旋锁用原子交换实现忙等待,适合短临界区和中断上下文;互斥锁在锁不可用时阻塞任务并加入等待队列,适合长临界区;信号量通过计数器管理资源访问数量,P/V 操作天然适配生产者-消费者这类需要协调资源供需的场景。三种原语从简单到复杂,覆盖了内核中绝大部分同步需求。阻塞和唤醒机制(task_block / task_unblock)是互斥锁和信号量的共同基础,让等待中的任务释放 CPU 而非空转。下一章将进入用户空间,实现特权级切换和系统调用框架,届时中断安全的自旋锁会再次出场。

参考#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

同步原语:自旋锁、互斥锁与信号量
https://blog.souloss.com/posts/os/sync-spinlock-mutex-semaphore/
作者
Souloss
发布于
2021-08-02
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时