mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
978 字
3 分钟
Go sync 包深度解析:并发编程基石
2022-10-31

Go 的 sync 包是并发编程的核心基础,提供了原子操作、互斥锁、条件变量、等待组等关键同步原语。理解这些原语的底层实现和适用场景,是编写高效并发代码的前提。

一、Mutex:互斥锁#

1.1 基本用法#

var (
mu sync.Mutex
value int
)
func update(newVal int) {
mu.Lock()
defer mu.Unlock()
value = newVal
}
func read() int {
mu.Lock()
defer mu.Unlock()
return value
}

1.2 底层实现#

// sync.Mutex 源码简化
// 源码参考:https://github.com/golang/go/blob/go1.25.0/src/sync/mutex.go
// 源码参考:https://github.com/golang/go/blob/go1.25.0/src/sync/mutex.go
type Mutex struct {
state int32 // 32位状态
sema uint32 // 信号量
}
// 状态位
const (
mutexLocked = 1 << iota // 1: 锁被持有
mutexWoken // 2: 有等待者被唤醒
mutexStarving // 4: 饥饿模式
mutexWaiterShift = iota // 等待者数量偏移
)

1.3 饥饿模式#

Go 1.9 引入饥饿模式来解决锁竞争问题:

// 正常模式(默认):等待者 FIFO,先来先服务
// 饥饿模式:锁直接交给等待队列尾部
//
// 切换条件:
// - 等待超过 1ms 切换到饥饿模式
// - 等待队列空了切换回正常模式

Mutex 状态机

stateDiagram-v2 [*] --> Unlocked: 初始化 Unlocked --> Locked: Lock()<br/>成功获取锁 Locked --> Unlocked: Unlock()<br/>释放锁 state Locked { [*] --> 正常模式 正常模式 --> 饥饿模式: 等待者等待 > 1ms 饥饿模式 --> 正常模式: 等待队列为空<br/>或等待者 < 1ms } state 等待队列 { W1: goroutine A W2: goroutine B W3: goroutine C } note right of 正常模式 新 goroutine 可与等待者竞争 可能导致等待者饥饿 end note note right of 饥饿模式 锁直接交给队首等待者 新 goroutine 不竞争,直接排队 end note

Mutex Lock/Unlock 流程

flowchart TD A["Lock()"] --> B{"尝试 CAS 获取锁"} B -- "成功" --> C["获得锁,返回"] B -- "失败" --> D{"是否饥饿模式?"} D -- "是" --> E["直接加入等待队列"] D -- "否" --> F["自旋尝试获取"] F --> G{"自旋成功?"} G -- "是" --> C G -- "否" --> H["加入等待队列"] E --> I["等待被唤醒"] H --> I I --> J{"被唤醒"} J --> K{"是否饥饿模式?"} K -- "是" --> L["直接获得锁"] K -- "否" --> M["与新 goroutine 竞争"] M --> N{"竞争成功?"} N -- "是" --> L N -- "否" --> I L --> C style C fill:#6bcb77 style L fill:#6bcb77

1.4 使用注意#

// 常见错误:在锁外部访问共享数据
mu.Lock()
value = 100 // 这里有锁
fmt.Println(value)
mu.Unlock()
value = 200 // 这里没有锁!危险!
// 另一个常见错误:读也在锁外,同样是数据竞争
mu.Lock()
value = 100
mu.Unlock()
fmt.Println(value) // 锁外读取也有竞争!另一个 goroutine 可能正在写入
// 正确做法:所有对共享变量的读写都必须在锁内
mu.Lock()
value = 100
v := value // 在锁内拷贝
mu.Unlock()
fmt.Println(v) // 使用局部副本,安全

二、RWMutex:读写锁#

2.1 基本用法#

var (
mu sync.RWMutex
data map[string]string
)
func read(key string) string {
mu.RLock() // 读锁
defer mu.RUnlock()
return data[key]
}
func write(key, value string) {
mu.Lock() // 写锁
defer mu.Unlock()
data[key] = value
}

2.2 适用场景#

// 读多写少场景:RWMutex 优于 Mutex
// 读写锁:多个读者或单一写者
// 互斥锁:多个写者或单一写者

RWMutex 读写模式

flowchart TB subgraph 读模式["读模式(共享)"] R1["读者 1"] R2["读者 2"] R3["读者 3"] RD["共享数据"] R1 --> RD R2 --> RD R3 --> RD end subgraph 写模式["写模式(独占)"] W1["写者"] WD["共享数据"] W1 --> WD end subgraph 等待状态["等待状态"] WT["写者等待"] RT["读者等待"] WS["信号量"] WT -->|"等待 readerCount=0"| WS RT -->|"等待写者完成"| WS end style R1 fill:#4d96ff style R2 fill:#4d96ff style R3 fill:#4d96ff style W1 fill:#ff6b6b style WT fill:#ffd93d style RT fill:#ffd93d

RWMutex 状态转换

stateDiagram-v2 [*] --> 无锁 无锁 --> 读锁: RLock()<br/>readerCount++ 无锁 --> 写锁: Lock()<br/>获取 w 锁 读锁 --> 读锁: RLock()<br/>readerCount++ 读锁 --> 无锁: RUnlock()<br/>readerCount-- 读锁 --> 读锁等待写者: Lock()<br/>等待读者退出 读锁等待写者 --> 写锁: 最后一个读者退出<br/>唤醒写者 写锁 --> 无锁: Unlock() 写锁 --> 写锁等待读者: RLock()<br/>读者等待 写锁等待读者 --> 读锁: Unlock()<br/>唤醒读者 note right of 读锁 多个读者可同时持有 readerCount > 0 end note note right of 写锁 写者独占 阻塞所有读者和写者 end note

2.3 底层实现#

// 源码参考:https://github.com/golang/go/blob/go1.25.0/src/sync/rwmutex.go
type RWMutex struct {
w Mutex // 写锁入口
writerSem uint32 // 写者信号量
readerSem uint32 // 读者信号量
readerCount atomic.Int32 // 当前读者数
readerWait atomic.Int32 // 等待的写者数
}

三、WaitGroup:等待组#

3.1 基本用法#

var wg sync.WaitGroup
func main() {
for i := 0; i < 3; i++ {
wg.Add(1) // 计数 +1
go func(id int) {
defer wg.Done() // 计数 -1
process(id)
}(i)
}
wg.Wait() // 阻塞直到计数归零
fmt.Println("All done")
}

3.2 常见模式#

// 模式1:启动 workers
func processAll(items []Item) error {
var wg sync.WaitGroup
errCh := make(chan error, len(items))
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
if err := process(item); err != nil {
errCh <- err
}
}(item)
}
wg.Wait()
close(errCh)
var errs []error
for err := range errCh {
errs = append(errs, err)
}
return errors.Join(errs...)
}
// 模式2:扇出
func fanOut(ctx context.Context, tasks []Task) error {
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
execute(ctx, t)
}(task)
}
wg.Wait()
return nil
}

3.3 注意事项#

// 常见错误:Add 和 Done 不配对
func wrong() {
wg.Add(1) // +1
go func() {
defer wg.Done() // -1
// ...
}()
// 如果 goroutine panic,Done 不会执行
wg.Wait() // 永远等待!
}
// 正确做法:使用 defer
func correct() {
wg.Add(1)
go func() {
defer wg.Done() // 确保执行
// ...
}()
wg.Wait()
}

WaitGroup 计数器变化

sequenceDiagram participant M as 主 goroutine participant W as WaitGroup participant G1 as goroutine 1 participant G2 as goroutine 2 participant G3 as goroutine 3 M->>W: Add(3), counter=3 M->>G1: go worker() M->>G2: go worker() M->>G3: go worker() M->>W: Wait() 阻塞 par 并发执行 G1->>W: Done(), counter=2 and G2->>W: Done(), counter=1 and G3->>W: Done(), counter=0 end W-->>M: 计数归零,唤醒 M->>M: 继续执行

WaitGroup 与 Channel 模式对比

flowchart LR subgraph WaitGroup模式["WaitGroup 模式"] WG1["wg.Add(n)"] WG2["启动 n 个 goroutine"] WG3["wg.Wait()"] WG1 --> WG2 --> WG3 end subgraph Channel模式["Channel 模式"] CH1["ch := make(chan struct{}, n)"] CH2["启动 n 个 goroutine<br/>完成后 ch <- struct{}{}"] CH3["for i := 0; i < n; i++ { <-ch }"] CH1 --> CH2 --> CH3 end subgraph 选择建议["选择建议"] S1["WaitGroup: 更简洁<br/>适合等待多个 goroutine"] S2["Channel: 更灵活<br/>可以传递结果"] end WaitGroup模式 --> S1 Channel模式 --> S2 style WG3 fill:#6bcb77 style CH3 fill:#6bcb77

四、Once:单次执行#

4.1 基本用法#

var (
once sync.Once
config *Config
)
func getConfig() *Config {
once.Do(func() {
config = loadConfig() // 只执行一次
})
return config
}

4.2 实现原理#

// 源码参考:https://github.com/golang/go/blob/go1.25.0/src/sync/once.go
type Once struct {
done atomic.Uint32
m Mutex
}
func (o *Once) Do(f func()) {
if o.done.Load() == 0 { // 快速路径
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 { // 双重检查
f()
o.done.Store(1)
}
}

4.3 实际应用#

// 应用1:单例模式
type DB struct {
conn *sql.DB
}
var (
db *DB
dbOnce sync.Once
)
func GetDB() *DB {
dbOnce.Do(func() {
db = &DB{conn: connect()}
})
return db
}
// 应用2:初始化
type Service struct {
once sync.Once
client *http.Client
}
func (s *Service) init() {
s.once.Do(func() {
s.client = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
},
}
})
}

五、Cond:条件变量#

5.1 基本用法#

var (
cond = sync.NewCond(&sync.Mutex{})
ready bool
)
func waitForReady() {
cond.L.Lock()
for !ready {
cond.Wait() // 阻塞并释放锁
}
cond.L.Unlock()
fmt.Println("Ready!")
}
func setReady() {
cond.L.Lock()
ready = true
cond.L.Unlock()
cond.Signal() // 唤醒一个等待者
// 或 cond.Broadcast() 唤醒所有
}

5.2 Wait 的原理#

// cond.Wait() 内部操作:
// 1. 释放锁
// 2. 阻塞等待信号
// 3. 重新获取锁
// 因此必须在循环中使用:
for !condition {
cond.Wait() // 自动释放和重新获取锁
}

Cond Wait/Signal 流程

sequenceDiagram participant W as 等待者 goroutine participant L as Mutex 锁 participant C as Cond 条件变量 participant S as 发送者 goroutine W->>L: Lock() Note over W: 持有锁 W->>C: Wait() Note over C: 1. 释放锁<br/>2. 加入等待队列<br/>3. 阻塞 L-->>S: 锁可用 S->>L: Lock() Note over S: 持有锁 S->>S: 修改条件 S->>L: Unlock() S->>C: Signal() Note over C: 唤醒一个等待者 C->>W: 唤醒 W->>L: 重新获取锁 Note over W: 持有锁 W->>W: 检查条件 alt 条件满足 W->>L: Unlock() Note over W: 继续执行 else 条件不满足 W->>C: Wait() Note over W: 再次等待 end

Signal vs Broadcast

flowchart TB subgraph Signal["Signal() - 唤醒一个"] S1["等待者 A"] -->|"等待"| C1[Cond] S2["等待者 B"] -->|"等待"| C1 S3["等待者 C"] -->|"等待"| C1 C1 -->|"Signal"| S1 S2 -.->|"继续等待"| C1 S3 -.->|"继续等待"| C1 end subgraph Broadcast["Broadcast() - 唤醒全部"] B1["等待者 A"] -->|"等待"| C2[Cond] B2["等待者 B"] -->|"等待"| C2 B3["等待者 C"] -->|"等待"| C2 C2 -->|"Broadcast"| B1 C2 -->|"Broadcast"| B2 C2 -->|"Broadcast"| B3 end style S1 fill:#6bcb77 style B1 fill:#6bcb77 style B2 fill:#6bcb77 style B3 fill:#6bcb77

5.3 实际应用#

// 应用:实现简单的信号量
type Semaphore struct {
cond *sync.Cond
count int
max int
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
cond: sync.NewCond(&sync.Mutex{}),
max: max,
}
}
func (s *Semaphore) Acquire() {
s.cond.L.Lock()
for s.count >= s.max {
s.cond.Wait()
}
s.count++
s.cond.L.Unlock()
}
func (s *Semaphore) Release() {
s.cond.L.Lock()
s.count--
s.cond.Signal() // 唤醒一个等待者
s.cond.L.Unlock()
}

六、Pool:对象池#

6.1 基本用法#

var bufferPool = sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
}
func getBuffer() *bytes.Buffer {
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset() // 重置状态
return buf
}
func putBuffer(buf *bytes.Buffer) {
bufferPool.Put(buf)
}

6.2 使用场景#

// 适合:高频分配/释放的轻量级对象
// - bytes.Buffer 重用
// - 临时解析器
// - 连接缓冲
// 不适合:需要保持状态的对象
// - 数据库连接(用 sql.DB)
// - 需要 pool.Get() 返回的对象,可能被其他人拿走

6.3 注意事项#

// Pool 中的对象可能会被 GC 回收
// 不要依赖 Pool 中对象的长期存在
// 错误:缓存连接
var connPool = sync.Pool{
New: func() any {
return createExpensiveConnection()
},
}
// 因为:GC 时 Pool 会被清空,连接丢失
// 正确:用专门的池管理(如 google.golang.org/grpc/balancer_conn_state_evaluator)

sync.Pool 生命周期

flowchart TB subgraph 创建["创建阶段"] N["New 函数"] --> P["Pool.local"] end subgraph 使用["使用阶段"] G1["Get()"] -->|"查找本地 P"| L["local.poolLocal"] L -->|"找到"| R1["返回对象"] L -->|"未找到"| N N --> R1 R2["使用对象"] --> PU["Put()"] PU -->|"放回"| L end subgraph GC["GC 阶段"] GC1["GC 触发"] --> V["victim cache 转移"] V -->|"下次 GC"| C["清空 victim"] end P --> G1 R1 --> R2 L --> GC1 style N fill:#4d96ff style R1 fill:#6bcb77 style R2 fill:#ffd93d style GC1 fill:#ff6b6b style C fill:#ff6b6b

Pool Get/Put 详细流程

sequenceDiagram participant G as goroutine participant P as P.local participant V as victim cache participant N as New 函数 G->>P: Get() alt 本地 P 有对象 P-->>G: 返回对象 else 本地 P 为空 G->>V: 检查 victim cache alt victim 有对象 V-->>G: 返回对象 else victim 为空 G->>N: 调用 New() N-->>G: 返回新对象 end end Note over G: 使用对象... G->>P: Put(obj) Note over P: 对象放回本地 P Note over V: GC 时,P → V<br/>下次 GC 时 V 被清空

七、sync.Map#

7.1 基本用法#

var m sync.Map
// 存储
m.Store("key", "value")
// 读取
value, ok := m.Load("key")
// 删除
m.Delete("key")
// 遍历
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true // 返回 false 停止遍历
})
// 加载或存储
value, loaded := m.LoadOrStore("key", "newValue")
// 如果 key 存在,loaded=true,value 是旧值
// 如果 key 不存在,loaded=false,value 是新值

7.2 适用场景#

// sync.Map 适合的场景:
// 1. 读多写少且 key 相对稳定
// 2. 多个 goroutine 独立读写不同的 key
// 3. 需要并发安全的 map
// 不适合的场景:
// 1. 需要对同一个 key 进行读写
// 2. 写多读少(用 Mutex + map)
// 3. 需要原子操作(如 LoadAndDelete)

7.3 性能对比#

// 读多写少:sync.Map 优势明显
// 写多读少:Mutex + map 更好
// benchmark 结果(仅供参考)
// 读多写少:sync.Map 比 Mutex+map 快 3-10 倍
// 写多读少:Mutex+map 比 sync.Map 快 2-5 倍

八、atomic 原子操作#

8.1 常用操作#

import "sync/atomic"
var counter atomic.Int64
// 原子加
counter.Add(1)
// 原子加载
n := counter.Load()
// 原子存储
counter.Store(100)
// 原子交换(返回旧值)
old := counter.Swap(200)
// 原子比较并交换
swapped := counter.CompareAndSwap(100, 200)
// 只有当 counter == 100 时才设置为 200

8.2 指针操作#

type Config struct {
Timeout int
}
var config atomic.Value
// 存储
config.Store(&Config{Timeout: 30})
// 加载
cfg := config.Load().(*Config)

8.3 实战应用#

// 应用1:实现无锁队列(简单版)
type LockFreeQueue struct {
head atomic.Pointer[node]
tail atomic.Pointer[node]
}
// 应用2:实现计数器
type Counter struct {
count atomic.Int64
}
func (c *Counter) Inc() {
c.count.Add(1)
}
func (c *Counter) Get() int64 {
return c.count.Load()
}

九、Map 与sync.Map 对比#

场景内置 map + Mutexsync.Map
读多写少一般更好
写多读少更好一般
读写都多更好一般
Key 稳定一般更好
Key 动态变化更好一般
需要原子操作更好不支持

十、总结#

sync 包是 Go 并发编程的核心:

原语用途
Mutex互斥锁,保护临界区
RWMutex读写锁,读多写少场景
WaitGroup等待一组 goroutine 完成
Once单次执行
Cond条件等待
Pool对象池,复用临时对象
sync.Map并发安全 map
atomic原子操作

sync 原语选择决策图

flowchart TD A["需要同步机制"] --> B{"需要保护共享数据?"} B -- "否" --> C{"需要等待多个 goroutine?"} C -- "是" --> D["WaitGroup"] C -- "否" --> E{"需要单次初始化?"} E -- "是" --> F["Once"] E -- "否" --> G{"需要条件等待?"} G -- "是" --> H["Cond"] G -- "否" --> I["可能不需要 sync"] B -- "是" --> J{"读多写少?"} J -- "是" --> K["RWMutex"] J -- "否" --> L{"需要并发安全 map?"} L -- "是" --> M["sync.Map"] L -- "否" --> N{"需要原子操作?"} N -- "是" --> O["atomic"] N -- "否" --> P["Mutex"] A --> Q{"需要复用临时对象?"} Q -- "是" --> R["Pool"] style D fill:#4d96ff style F fill:#4d96ff style H fill:#4d96ff style K fill:#6bcb77 style M fill:#6bcb77 style O fill:#6bcb77 style P fill:#6bcb77 style R fill:#ffd93d

掌握这些原语的底层原理和适用场景,是编写高效、正确并发代码的基础。

九、常见问题#

Q1:Mutex 和 RWMutex 怎么选?#

读写比例是关键:读多写少用 RWMutex(多个读者可并发),写多或读写相当用 Mutex(RWMutex 的写锁开销更大)。基准测试显示读:写 > 10:1 时 RWMutex 有明显优势。

Q2:sync.Once 和 init 有什么区别?#

init 在程序启动时执行,无法延迟或条件执行。sync.Once 在首次调用时执行,支持延迟初始化和错误重试。需要运行时初始化(如数据库连接)用 Once。

Q3:sync.Pool 的对象什么时候会被回收?#

Pool 中的对象在每次 GC 时可能被清除。这意味着 Pool 不适合长期持有对象,只适合复用临时对象。GC 间隔影响 Pool 的命中率。

Q4:sync.Map 和 map+mutex 怎么选?#

读多写少且 key 相对固定用 sync.Map(读操作无锁),写多读少用 map+RWMutex(sync.Map 的写操作需要 dirty 提升,开销大)。

小结#

  • Mutex 使用信号量实现,正常路径无系统调用,性能优于 RWMutex
  • RWMutex 适合读多写少场景,写锁饥饿问题在 Go 1.20 后得到改善
  • WaitGroup 通过计数器+信号量实现,Add/Done/Wait 必须正确配对
  • sync.Once 保证仅执行一次,支持延迟初始化
  • sync.Pool 复用临时对象减少 GC 压力,但对象可能在 GC 时被清除

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Go sync 包深度解析:并发编程基石
https://blog.souloss.com/posts/golang/go-sync/
作者
Souloss
发布于
2022-10-31
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时