有了多任务和中断,就出现了并发问题。两个任务同时修改同一块数据,结果取决于谁先执行——这就是竞态条件(race condition)。竞态条件让程序行为变得不可预测,是并发编程中最危险的陷阱。
并发问题与竞态条件
在多任务环境下,当多个任务同时访问共享变量时,可能会出现竞态条件。看一个简单的例子:
// 共享变量int counter = 0;
// 任务A和任务B都执行这段代码void increment(void) { counter++; // 这不是原子操作!}counter++ 看起来很简单,但编译后实际上包含三步:
- 读取 counter 到寄存器
- 寄存器值 +1
- 写回 counter
如果两个任务同时执行,可能出现:
时间 | 任务A | 任务B-----|-------------------|-------------------T1 | 读取 counter=0 |T2 | | 读取 counter=0T3 | 寄存器+1 (=1) |T4 | | 寄存器+1 (=1)T5 | 写回 counter=1 |T6 | | 写回 counter=1预期结果应该是 2,但实际结果是 1!这就是竞态条件。为了解决这个问题,需要使用同步原语来保护临界区。
竞态条件的根源是多个执行流同时访问共享数据。最直接的解决方案是让一个执行流在访问时独占——这就是锁的概念。自旋锁是最简单的锁:如果锁已被占用,就不断循环检查(自旋),直到锁释放。
自旋锁(Spinlock)
自旋锁用于保护非常短的临界区,特别是在中断上下文或不能睡眠的场景中。它通过原子交换操作实现锁的获取,如果锁被占用,获取者会循环检查直到锁可用。自旋锁是一种忙等待锁,其工作流程如下:
typedef struct{ volatile uint32_t lock; // 0: 未锁定, 1: 锁定 volatile uint32_t interrupt_mask; // 保存中断状态(用于中断安全版本)} spinlock_t;
#define LOCKED_YES 1#define LOCKED_NO 0自旋锁的核心实现使用了 atomic_exchange 原子操作:
/** * spinlock_lock - 纯线程间抢锁(不能在 ISR 里使用) * @splock: 锁对象 * * 使用原子交换操作实现忙等待锁获取 */void spinlock_lock(spinlock_t *splock){ /* 如果锁已被占用,会一直空转直到持有者释放 */ while (atomic_exchange(&splock->lock, LOCKED_YES) != LOCKED_NO) { /* 单核空转时不需要 pause,因为不会有第二个核释放锁 */ }}
/** * spinlock_unlock - 释放锁(纯线程版本) * @splock: 锁对象 * * 单核下可以直接赋值,因为关中断后唯一执行流就是当前代码 */void spinlock_unlock(spinlock_t *splock){ splock->lock = LOCKED_NO;}atomic_exchange是一个原子操作,它原子地将新值写入目标地址并返回旧值- 如果返回值是
LOCKED_NO(0),说明锁之前是空闲的,我们成功获取了锁 - 如果返回值是
LOCKED_YES(1),说明锁已被占用,继续循环等待 - 释放锁时只需将
lock字段设置为LOCKED_NO
在可能被中断处理程序访问的临界区中,需要使用带中断保护的版本:
/** * spinlock_lock_irqsave - 带"关中断"功能的抢锁 * @splock: 锁对象 * * 执行步骤: * 1. 保存当前 EFLAGS 的 IF 位 * 2. 执行 cli 关闭本地中断 * 3. 用原子交换抢锁 * 4. 解锁时根据 interrupt_mask 决定是否重新 sti */void spinlock_lock_irqsave(spinlock_t *splock){ uint32_t eflags = get_eflags(); /* 读取进入前的完整标志寄存器 */ disable_interrupts(); /* cli:本地 CPU 不再响应 IRQ */ splock->interrupt_mask = (eflags & (1 << 9)); /* 只保留 IF 位 */
/* 下面与 spinlock_lock 完全相同 */ while (atomic_exchange(&splock->lock, LOCKED_YES) != LOCKED_NO) { }}
/** * spinlock_unlock_irqrestore - 释放锁并恢复中断状态 * @splock: 锁对象 * * 恢复顺序:先放锁 → 再恢复中断,防止在临界区里被中断插进来 */void spinlock_unlock_irqrestore(spinlock_t *splock){ splock->lock = LOCKED_NO; /* 放锁 */ if (splock->interrupt_mask) { /* 如果进锁前 IF==1,才重新开中断 */ enable_interrupts(); /* sti */ }}- 在获取锁前先保存中断状态并关闭中断
- 防止在临界区内被中断打断而导致死锁
- 释放锁时根据之前保存的中断状态决定是否重新开中断
- 保证”谁关谁开”,不会误把外层已关闭的中断重新打开
自旋锁适用于短临界区(持有锁时间非常短,自旋等待的开销小于任务切换)、中断上下文(不能睡眠的环境,中断处理程序不能调用 schedule)以及多核系统(自旋等待比睡眠更高效,因为可能很快在其他核上释放)。
自旋锁简单粗暴,但有一个问题:如果等待时间较长,自旋会浪费 CPU 时间。互斥锁改进了这一点——当锁不可用时,当前任务主动让出 CPU,进入阻塞状态,等锁释放时再被唤醒。
互斥锁(Mutex)
互斥锁在锁被占用时会让任务睡眠,而不是自旋等待。这对于持锁时间较长的场景更高效,可以避免 CPU 资源的浪费。互斥锁使用等待队列来管理阻塞的任务:
typedef struct mutex{ volatile uint32_t hold; // 0: 未锁定, 1: 锁定 volatile struct linked_list_node *thread_node; // 持有者(未来可用于调试) linked_list_t waiting_task_queue; // 等待队列 yieldlock_t ydlock; // 保护内部状态的锁} mutex_t;互斥锁的实现使用了 yieldlock(另一种锁机制)来保护内部状态:
/** * mutex_lock - 获取互斥锁 * @mp: 互斥锁对象 * * 如果锁被占用,将当前任务加入等待队列并阻塞 */void mutex_lock(mutex_t *mp){ while (1) { yieldlock_lock(&mp->ydlock);
if (atomic_exchange(&mp->hold, LOCKED_YES) == LOCKED_NO) { /* 成功获取锁 */ yieldlock_unlock(&mp->ydlock); return; }
/* 获取失败:将当前任务加入等待队列并阻塞 */ task_t *self = current_task(); linked_list_node_t *node = (linked_list_node_t *)kmalloc(sizeof(linked_list_node_t)); if (node != NULL) { node->ptr = (type_t)self; node->prev = NULL; node->next = NULL; linked_list_append(&mp->waiting_task_queue, node); }
yieldlock_unlock(&mp->ydlock);
/* 阻塞当前任务,schedule() 会切换到其他任务 */ task_block(self);
/* 被唤醒后重新尝试获取锁 */ }}
/** * mutex_unlock - 释放互斥锁 * @mp: 互斥锁对象 * * 如果有等待者,唤醒队列中的第一个 */void mutex_unlock(mutex_t *mp){ yieldlock_lock(&mp->ydlock);
mp->hold = LOCKED_NO; mp->thread_node = NULL;
/* 如果有等待者,唤醒第一个 */ if (mp->waiting_task_queue.size != 0) { linked_list_node_t *head = mp->waiting_task_queue.head; task_t *waiter = (task_t *)head->ptr; linked_list_remove(&mp->waiting_task_queue, head); kfree(head);
yieldlock_unlock(&mp->ydlock);
task_unblock(waiter); } else { yieldlock_unlock(&mp->ydlock); }}mutex_lock使用atomic_exchange尝试获取锁- 如果锁已被占用,将当前任务加入等待队列并调用
task_block阻塞 - 阻塞后的任务会被调度器切换出去,不会占用 CPU 资源
mutex_unlock检查是否有等待者,如果有则唤醒第一个等待者- 注意:锁会直接转移给等待者,而不是释放锁让等待者重新竞争
互斥锁只能保护互斥访问(0或1),但有时需要更灵活的同步——比如限制同时访问的资源数量。信号量通过计数器实现了这种能力。
信号量(Semaphore)
信号量是一个计数器,用于控制对资源的访问数量。它不仅可以用于互斥访问(信号量初始值为 1),还可以用于资源池管理(信号量初始值大于 1)。信号量通过 P 操作(wait)和 V 操作(signal)来管理资源:
typedef struct semaphore { volatile int count; // 可用资源数量 linked_list_t wait_queue; // 等待队列 spinlock_t lock; // 保护内部状态} semaphore_t;信号量的实现清晰地展示了 P/V 操作的语义:
/** * sem_init - 初始化信号量 * @sem: 信号量对象 * @initial_count: 初始资源数量 */void sem_init(semaphore_t *sem, int initial_count){ sem->count = initial_count; linked_list_init(&sem->wait_queue); spinlock_init(&sem->lock);}
/** * sem_wait - P 操作:等待/获取资源 * @sem: 信号量对象 * * 如果有资源可用,递减计数;否则阻塞当前任务 */void sem_wait(semaphore_t *sem){ spinlock_acquire(&sem->lock);
if (sem->count > 0) { /* 有资源可用,直接获取 */ sem->count--; spinlock_release(&sem->lock); return; }
/* count == 0:将当前任务加入等待队列并阻塞 */ task_t *self = current_task(); linked_list_node_t *node = (linked_list_node_t *)kmalloc(sizeof(linked_list_node_t)); if (node != NULL) { node->ptr = (type_t)self; node->prev = NULL; node->next = NULL; linked_list_append(&sem->wait_queue, node); }
spinlock_release(&sem->lock); task_block(self);}
/** * sem_signal - V 操作:释放/增加资源 * @sem: 信号量对象 * * 如果有等待者,唤醒一个;否则递增计数 */void sem_signal(semaphore_t *sem){ spinlock_acquire(&sem->lock);
if (sem->wait_queue.size > 0) { /* 有等待者,唤醒第一个 */ linked_list_node_t *head = sem->wait_queue.head; task_t *waiter = (task_t *)head->ptr; linked_list_remove(&sem->wait_queue, head); kfree(head);
spinlock_release(&sem->lock); task_unblock(waiter); } else { /* 没有等待者,增加资源计数 */ sem->count++; spinlock_release(&sem->lock); }}sem_wait(P 操作)首先检查count是否大于 0- 如果有资源可用,递减计数并立即返回
- 如果没有资源,将当前任务加入等待队列并阻塞
sem_signal(V 操作)检查是否有等待者- 如果有等待者,唤醒第一个等待者(不增加计数,资源直接转移)
- 如果没有等待者,递增计数表示资源增加
经典应用:生产者-消费者
信号量的经典应用场景是生产者-消费者问题:
semaphore_t empty; // 空槽位数量semaphore_t full; // 已填充槽位数量mutex_t mutex; // 保护缓冲区
// 初始化sem_init(&empty, BUFFER_SIZE); // 初始有 BUFFER_SIZE 个空槽位sem_init(&full, 0); // 初始没有已填充的槽位mutex_init(&mutex);
void producer(void) { while (1) { item = produce_item(); sem_wait(&empty); // 等待空槽位 mutex_lock(&mutex); buffer_put(item); mutex_unlock(&mutex); sem_post(&full); // 增加已填充槽位 }}
void consumer(void) { while (1) { sem_wait(&full); // 等待数据 mutex_lock(&mutex); item = buffer_get(); mutex_unlock(&mutex); sem_post(&empty); // 增加空槽位 consume_item(item); }}empty信号量控制缓冲区的空槽位数量full信号量控制缓冲区的已填充槽位数量- 生产者等待空槽位,消费者等待已填充槽位
- 互斥锁保护缓冲区的实际读写操作
- 这种模式确保了生产者不会写入已满的缓冲区,消费者不会读取空的缓冲区
代码实现
文件结构
11.kernel-sync/├── boot/│ ├── mbr.S│ └── loader.S├── kernel/│ ├── include/│ │ ├── spinlock.h # 自旋锁接口定义│ │ ├── mutex.h # 互斥锁接口定义│ │ ├── semaphore.h # 信号量接口定义│ │ ├── yieldlock.h # yieldlock 接口│ │ └── ...│ ├── sync/│ │ ├── spinlock.c # 自旋锁实现(使用 atomic_exchange)│ │ ├── spinlock.S # 自旋锁汇编辅助函数│ │ ├── mutex.c # 互斥锁实现(带等待队列)│ │ ├── semaphore.c # 信号量实现(P/V 操作)│ │ └── yieldlock.c # yieldlock 实现│ └── kernel.c # 主内核函数,包含测试代码└── Makefile自旋锁获取流程
互斥锁阻塞唤醒流程
三种同步原语的对比
| 特性 | 自旋锁(Spinlock) | 互斥锁(Mutex) | 信号量(Semaphore) |
|---|---|---|---|
| 等待方式 | 忙等待(自旋) | 阻塞(睡眠) | 阻塞(睡眠) |
| 适用场景 | 短临界区、中断上下文 | 长临界区、普通任务 | 资源计数、生产者-消费者 |
| CPU 开销 | 高(自旋时占用 CPU) | 低(阻塞时不占用 CPU) | 低(阻塞时不占用 CPU) |
| 资源数 | 1(互斥) | 1(互斥) | N(可配置) |
| 实现复杂度 | 简单(原子操作) | 中等(等待队列) | 中等(计数+队列) |
| 中断安全 | 需要特殊版本 | 不适用于中断 | 不适用于中断 |
运行与验证
编译运行
cd 11.kernel-syncmake cleanmake allmake run预期输出
Testing spinlock...Counter without lock: 98234 (expected: 100000) <- Race condition!Counter with spinlock: 100000 (expected: 100000) <- Correct!
Testing mutex...Task A trying to acquire mutex...Task A acquired mutexTask B trying to acquire mutex...Task A releasing mutex...Task B acquired mutexTask B releasing mutex...
Testing semaphore (producer-consumer)...Producer: produced item 0Consumer: consumed item 0Producer: produced item 1Consumer: consumed item 1...Producer: produced item 99Consumer: consumed item 99All tests passed!测试要点
- 自旋锁测试:验证无锁情况下的竞态条件和有锁情况下的正确性
- 互斥锁测试:验证任务的正确阻塞和唤醒
- 信号量测试:验证生产者-消费者模式的数据正确性
踩坑记录
1. 死锁(Deadlock)
问题原因:任务A持有锁1等待锁2,任务B持有锁2等待锁1
// 错误示例void task_a(void) { mutex_lock(&lock1); mutex_lock(&lock2); /* ... */ mutex_unlock(&lock2); mutex_unlock(&lock1);}
void task_b(void) { mutex_lock(&lock2); // 锁的获取顺序与 task_a 不同! mutex_lock(&lock1); /* ... */ mutex_unlock(&lock1); mutex_unlock(&lock2);}解决方案:统一锁的获取顺序,所有任务都以相同的顺序获取锁
// 正确示例:所有任务都先获取 lock1,再获取 lock2void task_a(void) { mutex_lock(&lock1); mutex_lock(&lock2); /* ... */ mutex_unlock(&lock2); mutex_unlock(&lock1);}
void task_b(void) { mutex_lock(&lock1); // 与 task_a 相同的顺序 mutex_lock(&lock2); /* ... */ mutex_unlock(&lock2); mutex_unlock(&lock1);}2. 优先级反转(Priority Inversion)
问题原因:低优先级任务持有锁,高优先级任务等待,中优先级任务抢占低优先级任务
解决方案:优先级继承协议,当高优先级任务等待低优先级任务持有的锁时,临时提升低优先级任务的优先级
3. 中断中的死锁
问题原因:中断处理程序尝试获取已被任务持有的锁,导致死锁
解决方案:使用 spinlock_lock_irqsave 和 spinlock_unlock_irqrestore,在获取锁前先关闭中断
// 错误示例void interrupt_handler(void) { spinlock_lock(&lock); // 如果任务A持有这个锁,会死锁! /* ... */ spinlock_unlock(&lock);}
// 正确示例void interrupt_handler(void) { spinlock_lock_irqsave(&lock); // 关中断再抢锁 /* ... */ spinlock_unlock_irqrestore(&lock); // 恢复中断状态}小结
自旋锁用原子交换实现忙等待,适合短临界区和中断上下文;互斥锁在锁不可用时阻塞任务并加入等待队列,适合长临界区;信号量通过计数器管理资源访问数量,P/V 操作天然适配生产者-消费者这类需要协调资源供需的场景。三种原语从简单到复杂,覆盖了内核中绝大部分同步需求。阻塞和唤醒机制(task_block / task_unblock)是互斥锁和信号量的共同基础,让等待中的任务释放 CPU 而非空转。下一章将进入用户空间,实现特权级切换和系统调用框架,届时中断安全的自旋锁会再次出场。
参考
- Spinlocks vs Mutexes - Linux 内核文档
- Dining Philosophers Problem - 哲学家就餐问题
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






