mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2237 字
6 分钟
无锁编程与原子操作
2026-03-22

8 个线程争一把锁,99% 的时间在等锁、1% 的时间在干活——这不是夸张,这是高并发场景的真实写照。锁的开销不只是线程阻塞:每次加锁/解锁都触发内核态切换,锁的缓存行在核心间弹跳(缓存一致性),等待队列的维护本身也需要锁。无锁编程用原子指令(CAS、FAA)替代锁,把并发控制从”排队等”变成”抢到了就干、抢不到就重试”——但这条路有自己的陷阱。

一、为什么需要无锁编程#

1.1 锁的问题#

graph TB subgraph "互斥锁的问题" A["线程 A 获取锁"] --> B["线程 B 等待<br/>上下文切换"] B --> C["线程 A 释放锁"] C --> D["内核唤醒线程 B<br/>系统调用"] E["优先级反转<br/>低优先级线程持锁<br/>高优先级线程等待"] F["死锁风险<br/>锁获取顺序不一致"] end style B fill:#ffcdd2,stroke:#c62828 style D fill:#ffcdd2,stroke:#c62828 style E fill:#ffcdd2,stroke:#c62828 style F fill:#ffcdd2,stroke:#c62828
锁的开销典型值说明
互斥锁(无竞争)~25 ns原子 CAS + 分支预测
互斥锁(有竞争)~1-10 μs上下文切换 + 内核调度
自旋锁(无竞争)~10 ns原子 CAS
自旋锁(有竞争)~100 ns-1 μs忙等待,不切换上下文
原子操作~5-10 ns硬件 CAS/FAA
Note

无竞争的互斥锁很快(~25 ns),但一旦发生竞争,上下文切换的开销就飙升到微秒级。对于高频更新的共享计数器(如 QPS 统计),锁的开销可能比实际工作还大。

二、硬件原子原语#

2.1 CAS(Compare-And-Swap)#

// CAS 语义:原子地比较并交换
// 如果 *addr == expected,则 *addr = desired,返回 true
// 否则返回 false,*addr 不变
bool cas(int* addr, int expected, int desired);
// x86 实现:LOCK CMPXCHG 指令
inline bool cas(volatile int* addr, int expected, int desired) {
int result;
__asm__ __volatile__ (
"lock cmpxchgl %3, %1\n\t"
"sete %0"
: "=r" (result), "+m" (*addr)
: "a" (expected), "r" (desired)
: "cc", "memory"
);
return result;
}

2.2 FAA(Fetch-And-Add)#

// FAA 语义:原子地增加并返回旧值
// old = *addr; *addr += value; return old;
int faa(int* addr, int value);
// x86 实现:LOCK XADD 指令
inline int faa(volatile int* addr, int value) {
__asm__ __volatile__ (
"lock xaddl %0, %1"
: "+r" (value), "+m" (*addr)
:
: "cc", "memory"
);
return value; // 返回旧值
}

2.3 原子操作对比#

操作x86 指令开销用途
CASLOCK CMPXCHG~5-10 ns条件更新、无锁算法基础
FAALOCK XADD~5-10 ns计数器、序列号
ExchangeLOCK XCHG~5-10 ns无条件交换
LoadMOV~1 ns原子读取(x86 天然原子)
StoreMOV~1 ns原子写入(需配合内存屏障)

2.4 ARM LL/SC 原子指令#

与 x86 的 LOCK 前缀不同,ARM 采用 Load-Linked/Store-Conditional(LL/SC)模式实现原子操作。LDXR 将内存值加载到寄存器并标记该地址,STXR 只有在标记仍有效时才写入成功:

// ARM64: 原子 CAS 的 LL/SC 实现
// LDXR: 独占加载,标记地址
// STXR: 独占存储,返回 0 表示成功
cas:
ldxr w0, [x1] // 独占加载 *addr → w0
cmp w0, w2 // 比较 w0 == expected?
b.ne fail
stxr w3, w4, [x1] // 独占存储 desired → *addr
cbnz w3, cas // 如果失败(被其他核心修改),重试
fail:
ret
维度x86 LOCK 前缀ARM LL/SC
语义总线锁定/缓存行锁定乐观并发,失败重试
粒度一次操作完成加载和存储分开
竞争时硬件串行化软件重试循环
代表指令LOCK CMPXCHGLDXR/STXR

LL/SC 的优势在于不需要总线锁——它通过地址标记实现”检测-提交”的乐观并发,在低竞争场景下更高效。但在高竞争时,STXR 频繁失败会导致重试风暴。ARMv8.1 引入了 LSE(Large System Extensions)原子指令(如 CASAL),在多核系统上性能更好。

三、C++ 原子操作#

3.1 std::atomic 基础#

#include <atomic>
std::atomic<int> counter{0};
// 原子递增
counter.fetch_add(1, std::memory_order_relaxed);
// 原子读取
int val = counter.load(std::memory_order_acquire);
// 原子写入
counter.store(42, std::memory_order_release);
// CAS 操作
int expected = 42;
bool success = counter.compare_exchange_strong(
expected, 100,
std::memory_order_acq_rel,
std::memory_order_acquire
);

3.2 内存序详解#

graph TB RELAXED["relaxed<br/>无顺序保证<br/>最快"] --> ACQ_REL["acq_rel<br/>获取+释放<br/>中等"] ACQUIRE["acquire<br/>获取语义<br/>读操作"] --> ACQ_REL RELEASE["release<br/>释放语义<br/>写操作"] --> ACQ_REL ACQ_REL --> SEQ_CST["seq_cst<br/>顺序一致<br/>最慢"] style RELAXED fill:#c8e6c9,stroke:#2e7d32 style SEQ_CST fill:#ffcdd2,stroke:#c62828
内存序含义开销使用场景
relaxed无顺序保证最快计数器、统计
acquire后续读不能重排到此之前读取同步变量
release前面写不能重排到此之后写入同步变量
acq_relacquire + release读写同步变量
seq_cst全局顺序一致最慢默认,最安全
// 经典的 acquire-release 模式
std::atomic<bool> ready{false};
int data = 0;
// 生产者线程
void producer() {
data = 42; // 写数据
ready.store(true, std::memory_order_release); // 释放:保证上面的写可见
}
// 消费者线程
void consumer() {
while (!ready.load(std::memory_order_acquire)) {} // 获取:保证看到 release 之前的写
assert(data == 42); // 一定成立!
}

四、缓存行弹跳#

4.1 伪共享#

// 伪共享:两个计数器在同一缓存行
struct BadCounters {
std::atomic<int> a; // 线程 1 频繁更新
std::atomic<int> b; // 线程 2 频繁更新
// a 和 b 可能在同一 64 字节缓存行
// 线程 1 更新 a → 使线程 2 的缓存行失效
// 线程 2 更新 b → 使线程 1 的缓存行失效
// 反复弹跳,性能下降 10-50x
};
// 缓存行对齐:消除伪共享
struct GoodCounters {
alignas(64) std::atomic<int> a; // 独占缓存行
alignas(64) std::atomic<int> b; // 独占缓存行
};

4.2 检测伪共享#

# 使用 perf 检测缓存行弹跳
perf stat -e cache-misses,cycles,L1-dcache-load-misses \
./counter_benchmark
# 使用 perf c2c 检测伪共享
perf c2c record ./counter_benchmark
perf c2c report
# 输出会显示哪些缓存行被多个线程修改

五、退避策略#

5.1 指数退避#

// CAS 失败时的指数退避
void atomic_add(std::atomic<int>& counter, int value) {
int backoff = 1;
int old = counter.load(std::memory_order_relaxed);
while (!counter.compare_exchange_weak(
old, old + value,
std::memory_order_relaxed)) {
// 退避:减少缓存行争用
for (int i = 0; i < backoff; i++) {
_mm_pause(); // x86 PAUSE 指令,约 40 ns
}
backoff = std::min(backoff * 2, 64); // 上限 64
}
}

5.2 退避策略对比#

策略低竞争高竞争说明
无退避慢(活锁风险)CAS 失败立即重试
固定退避固定次数 PAUSE
指数退避较好逐步增加等待时间
自适应最优最优根据竞争程度动态调整

六、无锁数据结构#

6.1 无锁计数器#

// Go: 无锁计数器
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Get() int64 {
return atomic.LoadInt64(&c.value)
}
// Per-CPU 计数器:消除缓存行争用
type PerCPUCounter struct {
counters []int64 // 每个 CPU 一个计数器,缓存行对齐
padding [64]byte // 防止伪共享
}
func (c *PerCPUCounter) Increment() {
cpu := runtime.GetCPU() // 获取当前 CPU ID
atomic.AddInt64(&c.counters[cpu], 1)
}
func (c *PerCPUCounter) Get() int64 {
var total int64
for i := range c.counters {
total += atomic.LoadInt64(&c.counters[i])
}
return total
}

6.2 Michael-Scott 无锁队列#

// Michael-Scott 无锁队列(简化版)
template<typename T>
class LockFreeQueue {
struct Node {
T data;
std::atomic<Node*> next{nullptr};
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* sentinel = new Node{};
head.store(sentinel);
tail.store(sentinel);
}
void enqueue(T value) {
Node* node = new Node{value};
Node* old_tail = tail.load(std::memory_order_acquire);
while (true) {
Node* next = old_tail->next.load(std::memory_order_acquire);
if (next == nullptr) {
// 尝试链接新节点
Node* null = nullptr;
if (old_tail->next.compare_exchange_strong(
null, node,
std::memory_order_release,
std::memory_order_relaxed)) {
// 成功,尝试推进 tail
tail.compare_exchange_strong(
old_tail, node,
std::memory_order_release,
std::memory_order_relaxed);
return;
}
} else {
// tail 落后了,帮助推进
tail.compare_exchange_strong(
old_tail, next,
std::memory_order_release,
std::memory_order_relaxed);
}
old_tail = tail.load(std::memory_order_acquire);
}
}
};

6.3 Treiber 无锁栈#

Treiber 栈是最经典的无锁数据结构之一,push 和 pop 都通过 CAS 操作完成:

// Treiber 无锁栈(简化版)
template<typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
};
std::atomic<Node*> head{nullptr};
public:
void push(T value) {
Node* node = new Node{value, head.load()};
while (!head.compare_exchange_strong(node->next, node,
std::memory_order_release, std::memory_order_relaxed)) {
// CAS 失败:head 已被其他线程修改,重试
}
}
bool pop(T& result) {
Node* old_head = head.load(std::memory_order_acquire);
while (old_head && !head.compare_exchange_strong(
old_head, old_head->next,
std::memory_order_acquire, std::memory_order_relaxed)) {
// CAS 失败:head 已被其他线程修改
}
if (!old_head) return false; // 栈空
result = old_head->data;
// 这里存在 ABA 问题:old_head 可能已被其他线程 pop 后又 push 回来
delete old_head; // 需要 Hazard Pointer 或版本号保护
return true;
}
};

Treiber 栈的 pop 操作天然面临 ABA 问题——一个节点被 pop 后可能立即被 push 回来,导致另一个线程的 CAS 误判。实际实现中需要配合 Hazard Pointer 或带版本号的 TaggedPointer 来安全释放节点。

6.4 ABA 问题#

// ABA 问题:CAS 无法区分值的变化历史
// 1. 线程 1 读取 *addr = A
// 2. 线程 2 将 *addr 从 A 改为 B
// 3. 线程 2 将 *addr 从 B 改回 A
// 4. 线程 1 执行 CAS(A, C) → 成功!但数据已被修改过
// 解决方案 1:版本号
struct TaggedPointer {
void* ptr;
uint64_t tag; // 每次修改递增
};
// 解决方案 2:Hazard Pointer
// 在访问共享节点前,先注册 hazard pointer
// 其他线程在释放节点前检查是否有 hazard pointer 指向它
// 解决方案 3:RCU(Read-Copy-Update)
// 读者无锁访问旧版本
// 写者创建新版本,通过 RCU 机制等待所有读者切换
Warning

ABA 问题是无锁算法的经典陷阱。在 64 位系统上,可以将指针和版本号打包到一个 64 位原子变量中(低 48 位指针 + 高 16 位版本号),利用版本号检测 ABA。

七、RCU(Read-Copy-Update)#

7.1 RCU 原理#

sequenceDiagram participant R as 读者 participant W as 写者 participant D as 数据 R->>D: 读取旧版本(无锁) W->>D: 创建新版本(copy) W->>D: 原子替换指针(update) Note over R: 读者继续读旧版本 W->>W: 等待宽限期(grace period) Note over R: 所有读者退出临界区 W->>D: 释放旧版本
// Linux 内核 RCU 使用
// 读者:无锁访问
rcu_read_lock();
struct data* d = rcu_dereference(global_ptr);
// 使用 d...
rcu_read_unlock();
// 写者:创建新版本并替换
struct data* new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
*new_data = *old_data;
new_data->field = new_value;
rcu_assign_pointer(global_ptr, new_data);
synchronize_rcu(); // 等待宽限期
kfree(old_data);

7.2 硬件内存屏障详解#

Store Buffer 和 Invalidate Queue 的交互是理解内存屏障的关键。在第 8 章中讨论了 x86 TSO 模型,这里从硬件角度深入看屏障的实际效果:

graph TB subgraph CoreA["核心 A"] A_SB["Store Buffer<br/>data=42, flag=1"] A_CACHE["L1 缓存"] end subgraph CoreB["核心 B"] B_IQ["Invalidate Queue<br/>待处理的 Invalidate"] B_CACHE["L1 缓存"] end A_SB -->|"smp_wmb()<br/>排空 Store Buffer"| A_CACHE A_CACHE -->|"Invalidate 消息"| B_IQ B_IQ -->|"smp_rmb()<br/>处理 Invalidate Queue"| B_CACHE style A_SB fill:#fff9c4,stroke:#f9a825 style B_IQ fill:#fff9c4,stroke:#f9a825
屏障类型x86 指令ARM 指令作用
写屏障(smp_wmb)sfenceDMB ST排空 Store Buffer
读屏障(smp_rmb)lfenceDMB LD处理 Invalidate Queue
全屏障(smp_mb)mfenceDMB SY排空 SB + 处理 IQ

x86 的 TSO 模型只允许 Store-Load 重排,因此 smp_wmbsmp_rmb 在 x86 上是空操作(no-op),只有 smp_mb 编译为 mfence。ARM 的弱序模型允许所有四种重排,每种屏障都必须编译为对应的 DMB 指令。

Note

ARM 的内存模型比 x86 弱得多——ARM 允许 Store-Store、Load-Load、Store-Load、Load-Store 四种重排,而 x86 TSO 只允许 Store-Load 重排。在 ARM 上编写无锁代码时,必须显式添加内存屏障;同样的代码在 x86 上可能碰巧正确,但在 ARM 上会出 bug。跨平台无锁代码必须使用 C++ std::atomic 的内存序参数而非裸屏障指令。

7.3 Hazard Pointer#

Hazard Pointer 是解决无锁数据结构中节点安全释放的机制。每个线程维护一组”危险指针”,在访问共享节点前先注册:

// Hazard Pointer 示意(简化版)
// 每个线程拥有 MAX_HAZARD 个危险指针槽位
constexpr int MAX_HAZARD = 2;
std::atomic<void*> hazard_ptrs[MAX_THREADS][MAX_HAZARD];
// 读者:注册 hazard pointer
void* node = head.load();
hazard_ptrs[my_thread][0].store(node); // 声明"我正在用 node"
// ... 使用 node ...
hazard_ptrs[my_thread][0].store(nullptr); // 用完了,释放声明
// 写者:删除节点前检查
void retire_node(void* node) {
retired_list.push(node);
if (retired_list.size() > threshold) {
scan_retired(); // 遍历所有线程的 hazard pointer
// 只释放没有任何 hazard pointer 指向的节点
}
}
维度Hazard PointerRCU
读者开销1-2 次原子 store仅禁用/启用抢占
写者开销扫描所有线程的 HP等待宽限期
延迟立即回收(无宽限期)宽限期后才回收
适用场景无锁栈/队列读多写极少的链表

7.4 Linux Per-CPU 变量#

Linux 内核大量使用 Per-CPU 变量来消除缓存行弹跳——每个 CPU 拥有变量的独立副本,更新时无需跨核 Invalidate:

// 定义 Per-CPU 变量
DEFINE_PER_CPU(int, my_counter); // 每个 CPU 一个 int 副本
// 更新:只访问本 CPU 的副本,无需原子操作
this_cpu_inc(my_counter); // 等价于 my_counter[cpu]++,无缓存行弹跳
// 读取全局值:需要遍历所有 CPU 的副本
int total = 0;
for_each_possible_cpu(cpu) {
total += per_cpu(my_counter, cpu);
}

Per-CPU 变量之所以能避免缓存行弹跳,是因为每个 CPU 的副本位于不同的缓存行——更新操作只涉及本地缓存行(M 状态),不需要发送 Invalidate 消息。代价是读取全局值需要遍历所有 CPU 的副本,但统计场景下读远少于写,这是可接受的权衡。

7.5 Userspace RCU(liburcu)#

Linux 内核的 RCU 机制无法直接在用户态使用。liburcu(Userspace RCU)提供了等效的用户态实现,被 dpdk、glusterfs 等项目采用:

#include <urcu.h>
#include <urcu-pointer.h>
struct data* global_ptr;
// 读者
void reader() {
rcu_read_lock();
struct data* d = rcu_dereference(global_ptr);
// 使用 d... 无锁,零开销
rcu_read_unlock();
}
// 写者
void writer(struct data* new_data) {
struct data* old = rcu_xchg_pointer(&global_ptr, new_data);
synchronize_rcu(); // 等待所有读者退出
free(old);
}

liburcu 提供多种变体:QSBR(最轻量,需读者显式汇报静止状态)、MB(基于内存屏障,通用但较慢)、Signal(基于信号强制静止状态)。高性能场景推荐 QSBR 变体。

7.6 无锁哈希表#

无锁哈希表的实现比队列和栈复杂得多。主流方案包括:

方案核心思路复杂度
分片锁哈希表每个桶独立加锁低,但非严格无锁
Lock-coupon(Herlihy)CAS 逐桶推进高,理论意义
Folklore 无锁哈希开放寻址 + CAS 插入中,实践中常用

生产环境通常选择分片锁方案——将哈希表分为 N 个桶组,每组独立加锁,在并发度和实现复杂度之间取得平衡。严格的无锁哈希表实现复杂且 ABA 问题严重,目前仅在学术研究中使用。

八、何时使用无锁#

场景推荐原因
共享计数器原子操作FAA 最快,无竞争风险
单生产者单消费者队列无锁队列避免锁开销
高频更新的统计Per-CPU消除缓存行争用
低竞争的共享数据自旋锁简单,开销小
复杂的数据结构修改互斥锁无锁实现复杂,ABA 风险
需要等待的操作互斥锁自旋浪费 CPU
代码可读性优先互斥锁锁更易理解和维护

九、总结#

上一章深入探讨了数据导向设计。

技术开销复杂度适用场景
互斥锁25 ns - 10 μs通用并发
自旋锁10 ns - 1 μs短临界区
原子操作5-10 ns计数器、标志
无锁队列10-50 nsSPSC/MPSC 队列
RCU读: 0 ns, 写: ms读多写极少
Tip

无锁编程的黄金法则:先用锁,性能不够时再考虑无锁。无锁代码更难写、更难验证、更难维护。只有在性能分析证明锁是瓶颈时,才值得投入无锁优化的复杂度。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

无锁编程与原子操作
https://blog.souloss.com/posts/cpu-architecture/lock-free-programming/
作者
Souloss
发布于
2026-03-22
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时