8 个线程争一把锁,99% 的时间在等锁、1% 的时间在干活——这不是夸张,这是高并发场景的真实写照。锁的开销不只是线程阻塞:每次加锁/解锁都触发内核态切换,锁的缓存行在核心间弹跳(缓存一致性),等待队列的维护本身也需要锁。无锁编程用原子指令(CAS、FAA)替代锁,把并发控制从”排队等”变成”抢到了就干、抢不到就重试”——但这条路有自己的陷阱。
一、为什么需要无锁编程
1.1 锁的问题
| 锁的开销 | 典型值 | 说明 |
|---|---|---|
| 互斥锁(无竞争) | ~25 ns | 原子 CAS + 分支预测 |
| 互斥锁(有竞争) | ~1-10 μs | 上下文切换 + 内核调度 |
| 自旋锁(无竞争) | ~10 ns | 原子 CAS |
| 自旋锁(有竞争) | ~100 ns-1 μs | 忙等待,不切换上下文 |
| 原子操作 | ~5-10 ns | 硬件 CAS/FAA |
无竞争的互斥锁很快(~25 ns),但一旦发生竞争,上下文切换的开销就飙升到微秒级。对于高频更新的共享计数器(如 QPS 统计),锁的开销可能比实际工作还大。
二、硬件原子原语
2.1 CAS(Compare-And-Swap)
// CAS 语义:原子地比较并交换// 如果 *addr == expected,则 *addr = desired,返回 true// 否则返回 false,*addr 不变bool cas(int* addr, int expected, int desired);
// x86 实现:LOCK CMPXCHG 指令inline bool cas(volatile int* addr, int expected, int desired) { int result; __asm__ __volatile__ ( "lock cmpxchgl %3, %1\n\t" "sete %0" : "=r" (result), "+m" (*addr) : "a" (expected), "r" (desired) : "cc", "memory" ); return result;}2.2 FAA(Fetch-And-Add)
// FAA 语义:原子地增加并返回旧值// old = *addr; *addr += value; return old;int faa(int* addr, int value);
// x86 实现:LOCK XADD 指令inline int faa(volatile int* addr, int value) { __asm__ __volatile__ ( "lock xaddl %0, %1" : "+r" (value), "+m" (*addr) : : "cc", "memory" ); return value; // 返回旧值}2.3 原子操作对比
| 操作 | x86 指令 | 开销 | 用途 |
|---|---|---|---|
| CAS | LOCK CMPXCHG | ~5-10 ns | 条件更新、无锁算法基础 |
| FAA | LOCK XADD | ~5-10 ns | 计数器、序列号 |
| Exchange | LOCK XCHG | ~5-10 ns | 无条件交换 |
| Load | MOV | ~1 ns | 原子读取(x86 天然原子) |
| Store | MOV | ~1 ns | 原子写入(需配合内存屏障) |
2.4 ARM LL/SC 原子指令
与 x86 的 LOCK 前缀不同,ARM 采用 Load-Linked/Store-Conditional(LL/SC)模式实现原子操作。LDXR 将内存值加载到寄存器并标记该地址,STXR 只有在标记仍有效时才写入成功:
// ARM64: 原子 CAS 的 LL/SC 实现// LDXR: 独占加载,标记地址// STXR: 独占存储,返回 0 表示成功cas: ldxr w0, [x1] // 独占加载 *addr → w0 cmp w0, w2 // 比较 w0 == expected? b.ne fail stxr w3, w4, [x1] // 独占存储 desired → *addr cbnz w3, cas // 如果失败(被其他核心修改),重试fail: ret| 维度 | x86 LOCK 前缀 | ARM LL/SC |
|---|---|---|
| 语义 | 总线锁定/缓存行锁定 | 乐观并发,失败重试 |
| 粒度 | 一次操作完成 | 加载和存储分开 |
| 竞争时 | 硬件串行化 | 软件重试循环 |
| 代表指令 | LOCK CMPXCHG | LDXR/STXR |
LL/SC 的优势在于不需要总线锁——它通过地址标记实现”检测-提交”的乐观并发,在低竞争场景下更高效。但在高竞争时,STXR 频繁失败会导致重试风暴。ARMv8.1 引入了 LSE(Large System Extensions)原子指令(如 CASAL),在多核系统上性能更好。
三、C++ 原子操作
3.1 std::atomic 基础
#include <atomic>
std::atomic<int> counter{0};
// 原子递增counter.fetch_add(1, std::memory_order_relaxed);
// 原子读取int val = counter.load(std::memory_order_acquire);
// 原子写入counter.store(42, std::memory_order_release);
// CAS 操作int expected = 42;bool success = counter.compare_exchange_strong( expected, 100, std::memory_order_acq_rel, std::memory_order_acquire);3.2 内存序详解
| 内存序 | 含义 | 开销 | 使用场景 |
|---|---|---|---|
relaxed | 无顺序保证 | 最快 | 计数器、统计 |
acquire | 后续读不能重排到此之前 | 中 | 读取同步变量 |
release | 前面写不能重排到此之后 | 中 | 写入同步变量 |
acq_rel | acquire + release | 中 | 读写同步变量 |
seq_cst | 全局顺序一致 | 最慢 | 默认,最安全 |
// 经典的 acquire-release 模式std::atomic<bool> ready{false};int data = 0;
// 生产者线程void producer() { data = 42; // 写数据 ready.store(true, std::memory_order_release); // 释放:保证上面的写可见}
// 消费者线程void consumer() { while (!ready.load(std::memory_order_acquire)) {} // 获取:保证看到 release 之前的写 assert(data == 42); // 一定成立!}四、缓存行弹跳
4.1 伪共享
// 伪共享:两个计数器在同一缓存行struct BadCounters { std::atomic<int> a; // 线程 1 频繁更新 std::atomic<int> b; // 线程 2 频繁更新 // a 和 b 可能在同一 64 字节缓存行 // 线程 1 更新 a → 使线程 2 的缓存行失效 // 线程 2 更新 b → 使线程 1 的缓存行失效 // 反复弹跳,性能下降 10-50x};
// 缓存行对齐:消除伪共享struct GoodCounters { alignas(64) std::atomic<int> a; // 独占缓存行 alignas(64) std::atomic<int> b; // 独占缓存行};4.2 检测伪共享
# 使用 perf 检测缓存行弹跳perf stat -e cache-misses,cycles,L1-dcache-load-misses \ ./counter_benchmark
# 使用 perf c2c 检测伪共享perf c2c record ./counter_benchmarkperf c2c report# 输出会显示哪些缓存行被多个线程修改五、退避策略
5.1 指数退避
// CAS 失败时的指数退避void atomic_add(std::atomic<int>& counter, int value) { int backoff = 1; int old = counter.load(std::memory_order_relaxed); while (!counter.compare_exchange_weak( old, old + value, std::memory_order_relaxed)) { // 退避:减少缓存行争用 for (int i = 0; i < backoff; i++) { _mm_pause(); // x86 PAUSE 指令,约 40 ns } backoff = std::min(backoff * 2, 64); // 上限 64 }}5.2 退避策略对比
| 策略 | 低竞争 | 高竞争 | 说明 |
|---|---|---|---|
| 无退避 | 快 | 慢(活锁风险) | CAS 失败立即重试 |
| 固定退避 | 中 | 中 | 固定次数 PAUSE |
| 指数退避 | 快 | 较好 | 逐步增加等待时间 |
| 自适应 | 最优 | 最优 | 根据竞争程度动态调整 |
六、无锁数据结构
6.1 无锁计数器
// Go: 无锁计数器type AtomicCounter struct { value int64}
func (c *AtomicCounter) Increment() { atomic.AddInt64(&c.value, 1)}
func (c *AtomicCounter) Get() int64 { return atomic.LoadInt64(&c.value)}
// Per-CPU 计数器:消除缓存行争用type PerCPUCounter struct { counters []int64 // 每个 CPU 一个计数器,缓存行对齐 padding [64]byte // 防止伪共享}
func (c *PerCPUCounter) Increment() { cpu := runtime.GetCPU() // 获取当前 CPU ID atomic.AddInt64(&c.counters[cpu], 1)}
func (c *PerCPUCounter) Get() int64 { var total int64 for i := range c.counters { total += atomic.LoadInt64(&c.counters[i]) } return total}6.2 Michael-Scott 无锁队列
// Michael-Scott 无锁队列(简化版)template<typename T>class LockFreeQueue { struct Node { T data; std::atomic<Node*> next{nullptr}; };
std::atomic<Node*> head; std::atomic<Node*> tail;
public: LockFreeQueue() { Node* sentinel = new Node{}; head.store(sentinel); tail.store(sentinel); }
void enqueue(T value) { Node* node = new Node{value}; Node* old_tail = tail.load(std::memory_order_acquire);
while (true) { Node* next = old_tail->next.load(std::memory_order_acquire); if (next == nullptr) { // 尝试链接新节点 Node* null = nullptr; if (old_tail->next.compare_exchange_strong( null, node, std::memory_order_release, std::memory_order_relaxed)) { // 成功,尝试推进 tail tail.compare_exchange_strong( old_tail, node, std::memory_order_release, std::memory_order_relaxed); return; } } else { // tail 落后了,帮助推进 tail.compare_exchange_strong( old_tail, next, std::memory_order_release, std::memory_order_relaxed); } old_tail = tail.load(std::memory_order_acquire); } }};6.3 Treiber 无锁栈
Treiber 栈是最经典的无锁数据结构之一,push 和 pop 都通过 CAS 操作完成:
// Treiber 无锁栈(简化版)template<typename T>class LockFreeStack { struct Node { T data; Node* next; };
std::atomic<Node*> head{nullptr};
public: void push(T value) { Node* node = new Node{value, head.load()}; while (!head.compare_exchange_strong(node->next, node, std::memory_order_release, std::memory_order_relaxed)) { // CAS 失败:head 已被其他线程修改,重试 } }
bool pop(T& result) { Node* old_head = head.load(std::memory_order_acquire); while (old_head && !head.compare_exchange_strong( old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed)) { // CAS 失败:head 已被其他线程修改 } if (!old_head) return false; // 栈空 result = old_head->data; // 这里存在 ABA 问题:old_head 可能已被其他线程 pop 后又 push 回来 delete old_head; // 需要 Hazard Pointer 或版本号保护 return true; }};Treiber 栈的 pop 操作天然面临 ABA 问题——一个节点被 pop 后可能立即被 push 回来,导致另一个线程的 CAS 误判。实际实现中需要配合 Hazard Pointer 或带版本号的 TaggedPointer 来安全释放节点。
6.4 ABA 问题
// ABA 问题:CAS 无法区分值的变化历史// 1. 线程 1 读取 *addr = A// 2. 线程 2 将 *addr 从 A 改为 B// 3. 线程 2 将 *addr 从 B 改回 A// 4. 线程 1 执行 CAS(A, C) → 成功!但数据已被修改过
// 解决方案 1:版本号struct TaggedPointer { void* ptr; uint64_t tag; // 每次修改递增};
// 解决方案 2:Hazard Pointer// 在访问共享节点前,先注册 hazard pointer// 其他线程在释放节点前检查是否有 hazard pointer 指向它
// 解决方案 3:RCU(Read-Copy-Update)// 读者无锁访问旧版本// 写者创建新版本,通过 RCU 机制等待所有读者切换ABA 问题是无锁算法的经典陷阱。在 64 位系统上,可以将指针和版本号打包到一个 64 位原子变量中(低 48 位指针 + 高 16 位版本号),利用版本号检测 ABA。
七、RCU(Read-Copy-Update)
7.1 RCU 原理
// Linux 内核 RCU 使用// 读者:无锁访问rcu_read_lock();struct data* d = rcu_dereference(global_ptr);// 使用 d...rcu_read_unlock();
// 写者:创建新版本并替换struct data* new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);*new_data = *old_data;new_data->field = new_value;rcu_assign_pointer(global_ptr, new_data);synchronize_rcu(); // 等待宽限期kfree(old_data);7.2 硬件内存屏障详解
Store Buffer 和 Invalidate Queue 的交互是理解内存屏障的关键。在第 8 章中讨论了 x86 TSO 模型,这里从硬件角度深入看屏障的实际效果:
| 屏障类型 | x86 指令 | ARM 指令 | 作用 |
|---|---|---|---|
| 写屏障(smp_wmb) | sfence | DMB ST | 排空 Store Buffer |
| 读屏障(smp_rmb) | lfence | DMB LD | 处理 Invalidate Queue |
| 全屏障(smp_mb) | mfence | DMB SY | 排空 SB + 处理 IQ |
x86 的 TSO 模型只允许 Store-Load 重排,因此 smp_wmb 和 smp_rmb 在 x86 上是空操作(no-op),只有 smp_mb 编译为 mfence。ARM 的弱序模型允许所有四种重排,每种屏障都必须编译为对应的 DMB 指令。
ARM 的内存模型比 x86 弱得多——ARM 允许 Store-Store、Load-Load、Store-Load、Load-Store 四种重排,而 x86 TSO 只允许 Store-Load 重排。在 ARM 上编写无锁代码时,必须显式添加内存屏障;同样的代码在 x86 上可能碰巧正确,但在 ARM 上会出 bug。跨平台无锁代码必须使用 C++ std::atomic 的内存序参数而非裸屏障指令。
7.3 Hazard Pointer
Hazard Pointer 是解决无锁数据结构中节点安全释放的机制。每个线程维护一组”危险指针”,在访问共享节点前先注册:
// Hazard Pointer 示意(简化版)// 每个线程拥有 MAX_HAZARD 个危险指针槽位constexpr int MAX_HAZARD = 2;std::atomic<void*> hazard_ptrs[MAX_THREADS][MAX_HAZARD];
// 读者:注册 hazard pointervoid* node = head.load();hazard_ptrs[my_thread][0].store(node); // 声明"我正在用 node"// ... 使用 node ...hazard_ptrs[my_thread][0].store(nullptr); // 用完了,释放声明
// 写者:删除节点前检查void retire_node(void* node) { retired_list.push(node); if (retired_list.size() > threshold) { scan_retired(); // 遍历所有线程的 hazard pointer // 只释放没有任何 hazard pointer 指向的节点 }}| 维度 | Hazard Pointer | RCU |
|---|---|---|
| 读者开销 | 1-2 次原子 store | 仅禁用/启用抢占 |
| 写者开销 | 扫描所有线程的 HP | 等待宽限期 |
| 延迟 | 立即回收(无宽限期) | 宽限期后才回收 |
| 适用场景 | 无锁栈/队列 | 读多写极少的链表 |
7.4 Linux Per-CPU 变量
Linux 内核大量使用 Per-CPU 变量来消除缓存行弹跳——每个 CPU 拥有变量的独立副本,更新时无需跨核 Invalidate:
// 定义 Per-CPU 变量DEFINE_PER_CPU(int, my_counter); // 每个 CPU 一个 int 副本
// 更新:只访问本 CPU 的副本,无需原子操作this_cpu_inc(my_counter); // 等价于 my_counter[cpu]++,无缓存行弹跳
// 读取全局值:需要遍历所有 CPU 的副本int total = 0;for_each_possible_cpu(cpu) { total += per_cpu(my_counter, cpu);}Per-CPU 变量之所以能避免缓存行弹跳,是因为每个 CPU 的副本位于不同的缓存行——更新操作只涉及本地缓存行(M 状态),不需要发送 Invalidate 消息。代价是读取全局值需要遍历所有 CPU 的副本,但统计场景下读远少于写,这是可接受的权衡。
7.5 Userspace RCU(liburcu)
Linux 内核的 RCU 机制无法直接在用户态使用。liburcu(Userspace RCU)提供了等效的用户态实现,被 dpdk、glusterfs 等项目采用:
#include <urcu.h>#include <urcu-pointer.h>
struct data* global_ptr;
// 读者void reader() { rcu_read_lock(); struct data* d = rcu_dereference(global_ptr); // 使用 d... 无锁,零开销 rcu_read_unlock();}
// 写者void writer(struct data* new_data) { struct data* old = rcu_xchg_pointer(&global_ptr, new_data); synchronize_rcu(); // 等待所有读者退出 free(old);}liburcu 提供多种变体:QSBR(最轻量,需读者显式汇报静止状态)、MB(基于内存屏障,通用但较慢)、Signal(基于信号强制静止状态)。高性能场景推荐 QSBR 变体。
7.6 无锁哈希表
无锁哈希表的实现比队列和栈复杂得多。主流方案包括:
| 方案 | 核心思路 | 复杂度 |
|---|---|---|
| 分片锁哈希表 | 每个桶独立加锁 | 低,但非严格无锁 |
| Lock-coupon(Herlihy) | CAS 逐桶推进 | 高,理论意义 |
| Folklore 无锁哈希 | 开放寻址 + CAS 插入 | 中,实践中常用 |
生产环境通常选择分片锁方案——将哈希表分为 N 个桶组,每组独立加锁,在并发度和实现复杂度之间取得平衡。严格的无锁哈希表实现复杂且 ABA 问题严重,目前仅在学术研究中使用。
八、何时使用无锁
| 场景 | 推荐 | 原因 |
|---|---|---|
| 共享计数器 | 原子操作 | FAA 最快,无竞争风险 |
| 单生产者单消费者队列 | 无锁队列 | 避免锁开销 |
| 高频更新的统计 | Per-CPU | 消除缓存行争用 |
| 低竞争的共享数据 | 自旋锁 | 简单,开销小 |
| 复杂的数据结构修改 | 互斥锁 | 无锁实现复杂,ABA 风险 |
| 需要等待的操作 | 互斥锁 | 自旋浪费 CPU |
| 代码可读性优先 | 互斥锁 | 锁更易理解和维护 |
九、总结
上一章深入探讨了数据导向设计。
| 技术 | 开销 | 复杂度 | 适用场景 |
|---|---|---|---|
| 互斥锁 | 25 ns - 10 μs | 低 | 通用并发 |
| 自旋锁 | 10 ns - 1 μs | 低 | 短临界区 |
| 原子操作 | 5-10 ns | 中 | 计数器、标志 |
| 无锁队列 | 10-50 ns | 高 | SPSC/MPSC 队列 |
| RCU | 读: 0 ns, 写: ms | 高 | 读多写极少 |
无锁编程的黄金法则:先用锁,性能不够时再考虑无锁。无锁代码更难写、更难验证、更难维护。只有在性能分析证明锁是瓶颈时,才值得投入无锁优化的复杂度。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






