mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2883 字
8 分钟
Go 协程与通道:CSP 模型实现
2022-07-17

1. CSP 模型:Go 并发的理论基础#

1.1 什么是 CSP?#

CSP(Communicating Sequential Processes,通信顺序进程)是由 Tony Hoare 于 1978 年提出的并发编程理论。其核心思想是:

不要通过共享内存来通信,而要通过通信来共享内存。

Go 语言的并发设计深受 CSP 影响,通过以下两个原语实现了这一模型:

  • goroutine:轻量级的并发执行单元
  • channel:goroutine 之间的通信管道

1.2 传统并发模型 vs CSP 模型#

传统并发模型(共享内存):
┌──────────┐ ┌──────────┐
│Thread A │ │Thread B │
│ │ │ │
│ ┌──────▼─────▼──────┐ │
│ │ 共享内存 │ │
│ │ + 锁机制 │ │
│ └──────────────────┘ │
└──────────┘ └──────────┘
需要显式同步,容易出错
CSP 模型(消息传递):
┌──────────┐ ┌──────────┐
│goroutine │ ── channel ──▶ │goroutine │
│ A │ │ B │
└──────────┘ └──────────┘
通信即同步,更安全

2. Goroutine:轻量级并发单元#

2.1 goroutine 的创建:newproc#

当使用 go func() 启动一个 goroutine 时,编译器会将其转换为对 runtime.newproc 的调用:

// 用户代码
go hello()
// 编译后的等价代码
runtime.newproc(hello)

newproc 的核心实现:

// src/runtime/proc.go — https://github.com/golang/go/blob/go1.25.0/src/runtime/proc.go
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
// 获取当前 P
pp := gp.m.p.ptr()
// 将新 G 放入运行队列
runqput(pp, newg, true)
// 如果有空闲 P,尝试唤醒或创建 M
if mainStarted {
wakep()
}
})
}

newproc1 负责创建新的 goroutine 结构:

func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
pp := callergp.m.p.ptr()
// 尝试从 P 的空闲列表获取 G(复用)
newg := pp.gfree
if newg == nil {
// 没有 G 可复用,创建新的
newg = malg(stackMin) // stackMin = 2048 字节
}
// 计算栈空间并初始化
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize)
totalSize = alignUp(totalSize, sys.StackAlign)
// 设置 goroutine 的入口函数
sp := newg.stack.hi - totalSize
memclrNoHeapPointers(sp, totalSize)
// 初始化调度上下文
newg.sched.sp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 设置真正的入口函数
gostartcallfn(&newg.sched, fn)
// 设置状态为 _Grunnable
newg.atomicstatus = _Grunnable
return newg
}

2.2 goroutine 的状态转换#

goroutine 在其生命周期中会经历多个状态:

┌─────────────────────────────────────────────────────────┐
│ │
▼ │
┌───────────┐ ┌───────────┐ │
│ _Gidle │ │ _Gdead │ │
│ (新建) │ │ (已退出) │ │
└─────┬─────┘ └─────▲─────┘ │
│ │ │
│ newproc │ goexit │
▼ │ │
┌───────────┐ schedule ┌───────────┐ │ │
┌────────▶│_Grunnable │────────────▶│ _Grunning │──┘ │
│ │ (可运行) │ │ (运行中) │ │
│ └───────────┘ └─────┬─────┘ │
│ ▲ │ │
│ │ │ │
│ │ ┌───────────────┼───────────────┐ │
│ │ │ │ │ │
│ │ ▼ ▼ ▼ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ └───│_Gsyscall │ │ _Gwaiting │ │ _Gcopystack│ │
│ │(系统调用) │ │ (等待中) │ │ (栈扩展) │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────── Goroutine 复用池 ───────────────────────────────┘

2.3 goroutine 的销毁#

当 goroutine 执行完毕,会调用 goexit

// src/runtime/proc.go — https://github.com/golang/go/blob/go1.25.0/src/runtime/proc.go
func goexit() {
// 切换到 g0 执行清理
mcall(goexit0)
}
func goexit0(gp *g) {
mp := getg().m
pp := mp.p.ptr()
// 重置 goroutine 状态
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
// 解绑 M
mp.curg = nil
// 将 G 放入 P 的空闲列表(复用)
pp.gfree = gp
pp.gfree++
// 重新调度
schedule()
}

关键点:goroutine 不会立即释放,而是放入 P 的空闲列表,供后续复用。这避免了频繁的内存分配开销。

3. Channel:goroutine 之间的通信管道#

Channel 通信架构#

graph TD subgraph "hchan 结构体" BCOUNT["buf: 循环缓冲区(有缓冲 channel)"] QSEND["sendq: 等待发送的 G 队列"] QRECV["recvq: 等待接收的 G 队列"] LOCK["lock: 互斥锁"] CNT["count: 缓冲区元素数"] end subgraph "发送流程" S1["ch <- x"] --> S2{"缓冲区未满?"} S2 --> |"是"| S3["写入缓冲区"] S2 --> |"否"| S4["挂起 G 到 sendq"] end subgraph "接收流程" R1["<- ch"] --> R2{"缓冲区非空?"} R2 --> |"是"| R3["从缓冲区读取"] R2 --> |"否"| R4["挂起 G 到 recvq"] end style BCOUNT fill:#4CAF50,color:#fff style S4 fill:#FF9800,color:#fff style R4 fill:#FF9800,color:#fff

Channel 通信模式#

graph LR subgraph "无缓冲 channel(同步)" G1A["G1: ch <- x"] --> |"阻塞直到 G2 接收"| G2A["G2: <- ch"] end subgraph "有缓冲 channel(异步)" G1B["G1: ch <- x"] --> |"写入缓冲区不阻塞"| BUF["buf[x]"] BUF --> |"G2 读取"| G2B["G2: <- ch"] end style G1A fill:#FF9800,color:#fff style G1B fill:#4CAF50,color:#fff

3.1 hchan:channel 的底层数据结构#

channel 在运行时由 hchan 结构体表示:

// src/runtime/chan.go — https://github.com/golang/go/blob/go1.25.0/src/runtime/chan.go
type hchan struct {
qcount uint // 队列中的元素数量
dataqsiz uint // 循环队列的大小(缓冲区容量)
buf unsafe.Pointer // 指向循环队列的指针(仅缓冲 channel)
elemsize uint16 // 每个元素的大小
closed uint32 // channel 是否已关闭
elemtype *_type // 元素类型信息
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex // 互斥锁,保护 hchan 的所有字段
}
type waitq struct {
first *sudog // 队列头
last *sudog // 队列尾
}
type sudog struct {
g *g // 等待的 goroutine
elem unsafe.Pointer // 数据指针
next *sudog
prev *sudog
// ...
}

3.2 channel 创建#

make(chan T, N) 会被编译为 runtime.makechan

func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// 计算需要的内存大小
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
// 无缓冲 channel
c = (*hchan)(mallocgc(hchanSize, nil, true))
case elem.PtrBytes == 0:
// 元素不包含指针,可以一次性分配
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针,需要单独分配缓冲区
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}

3.3 channel 发送原理#

ch <- x 会被编译为 runtime.chansend1,最终调用 chansend

func chansend(c *hchan, block bool, elem unsafe.Pointer) bool {
// 加锁
lock(&c.lock)
// 检查 channel 是否已关闭
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 情况1:有等待接收的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// 直接发送给等待者
send(c, sg, elem, true)
unlock(&c.lock)
return true
}
// 情况2:缓冲区还有空间
if c.qcount < c.dataqsiz {
// 放入缓冲区
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 情况3:非阻塞模式直接返回
if !block {
unlock(&c.lock)
return false
}
// 情况4:阻塞等待
gp := getg()
mysg := acquireSudog()
mysg.g = gp
mysg.elem = elem
c.sendq.enqueue(mysg)
// 等待唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// 被唤醒后继续执行
releaseSudog(mysg)
return true
}

发送流程图

ch <- x 发送流程:
├──▶ 检查 channel 是否关闭?
│ │
│ ├── 已关闭 ──▶ panic("send on closed channel")
│ │
│ └── 未关闭 ──▶ 继续
├──▶ 有等待接收的 goroutine?
│ │
│ ├── 有 ──▶ 直接发送,唤醒接收者
│ │
│ └── 无 ──▶ 继续
├──▶ 缓冲区有空间?
│ │
│ ├── 有 ──▶ 放入缓冲区,返回
│ │
│ └── 无 ──▶ 继续
├──▶ 非阻塞模式?
│ │
│ ├── 是 ──▶ 返回 false
│ │
│ └── 否 ──▶ 当前 G 入队等待,阻塞

3.4 channel 接收原理#

x := <-ch 会被编译为 runtime.chanrecv1chanrecv2(带 ok 返回值):

func chanrecv(c *hchan, block bool, elem unsafe.Pointer) (received bool) {
lock(&c.lock)
// 情况1:channel 已关闭且缓冲区为空
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if elem != nil {
// 清零目标位置
memclrNoHeapPointers(elem, c.elemsize)
}
return false
}
// 情况2:有等待发送的 goroutine
if sg := c.sendq.dequeue(); sg != nil {
// 从发送者接收数据
recv(c, sg, elem, true)
unlock(&c.lock)
return true
}
// 情况3:缓冲区有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if elem != nil {
typedmemmove(c.elemtype, elem, qp)
}
memclrNoHeapPointers(qp, c.elemsize)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true
}
// 情况4:非阻塞模式
if !block {
unlock(&c.lock)
return false
}
// 情况5:阻塞等待
gp := getg()
mysg := acquireSudog()
mysg.g = gp
mysg.elem = elem
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
releaseSudog(mysg)
return true
}

3.5 有缓冲 vs 无缓冲通道#

无缓冲通道(unbuffered channel):
┌─────────────────────────────────────────────────────────────────┐
│ │
│ G1 (发送者) ch G2 (接收者) │
│ ┌───────┐ ┌─────────┐ ┌───────┐ │
│ │ │ │ │ │ │ │
│ │ ch<-x │ ─────────▶│ 同步点 │◀──────────│ <-ch │ │
│ │ │ 阻塞 │ (握手) │ 阻塞 │ │ │
│ └───────┘ └─────────┘ └───────┘ │
│ │
│ 发送和接收必须同时准备好,否则阻塞 │
│ sendq 或 recvq 最多有一个非空 │
└─────────────────────────────────────────────────────────────────┘
有缓冲通道(buffered channel):
┌─────────────────────────────────────────────────────────────────┐
│ │
│ G1 (发送者) ch G2 (接收者) │
│ ┌───────┐ ┌─────────────────┐ ┌───────┐ │
│ │ │ │ buf: [x][y][ ] │ │ │ │
│ │ ch<-z │ ──────▶ │ ↑ │ ─────▶│ <-ch │ │
│ │ │ │ sendx=2 │ │ │ │
│ └───────┘ │ recvx=0 │ └───────┘ │
│ │ qcount=2 │ │
│ └─────────────────┘ │
│ │
│ 缓冲区未满时可发送,非空时可接收 │
│ sendq 和 recvq 可能同时为空 │
└─────────────────────────────────────────────────────────────────┘

3.6 channel 关闭#

close(ch) 调用 runtime.closechan

func closechan(c *hchan) {
lock(&c.lock)
// 不能关闭已关闭的 channel
if c.closed != 0 {
unlock(&c.lock)
panic("close of closed channel")
}
c.closed = 1
// 唤醒所有等待接收的 goroutine
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
sg.elem = nil
glist.push(sg.g)
}
// 唤醒所有等待发送的 goroutine(会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
glist.push(sg.g)
}
unlock(&c.lock)
// 唤醒所有 goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

关闭 channel 的规则

操作nil channel已关闭的 channel正常 channel
发送永久阻塞panic正常发送或阻塞
接收永久阻塞返回零值,ok=false正常接收或阻塞
关闭panicpanic正常关闭

4. Select:多路复用#

4.1 select 的实现原理#

select 语句用于同时监听多个 channel 操作。Go 编译器会将 select 转换为对 runtime.selectgo 的调用:

// 用户代码
select {
case v := <-ch1:
// ...
case ch2 <- x:
// ...
default:
// ...
}
// 编译后的结构
var cases []scase
cases[0] = scase{kind: caseRecv, c: ch1, elem: &v}
cases[1] = scase{kind: caseSend, c: ch2, elem: &x}
cases[2] = scase{kind: caseDefault}
chosen, _ := selectgo(cases, ...)

4.2 scase 结构#

type scase struct {
c *hchan // channel
elem unsafe.Pointer // 数据指针
kind uint16 // case 类型
}
const (
caseNil = iota
caseRecv
caseSend
caseDefault
)

4.3 selectgo 核心逻辑#

func selectgo(cas0 *scase, order0 *uint16, ...) (int, bool) {
// 1. 打乱 case 顺序(公平性)
ncases := len(cases)
order := order0[:2*ncases]
for i := 0; i < ncases; i++ {
order[i] = uint16(i)
}
// Fisher-Yates 洗牌
for i := ncases - 1; i > 0; i-- {
j := fastrandn(i + 1)
order[i], order[j] = order[j], order[i]
}
// 2. 第一遍扫描:检查是否有 case 可以立即执行
for i := 0; i < ncases; i++ {
cas := &cases[order[i]]
switch cas.kind {
case caseRecv:
if c.qcount > 0 || c.closed != 0 {
// 可以接收
return int(order[i]), c.qcount > 0
}
case caseSend:
if c.qcount < c.dataqsiz || c.recvq.first != nil {
// 可以发送
return int(order[i]), false
}
case caseDefault:
return int(order[i]), false
}
}
// 3. 没有 case 可立即执行,将当前 G 加入所有 channel 的等待队列
gp := getg()
for i := 0; i < ncases; i++ {
cas := &cases[i]
if cas.kind == caseRecv {
c.recvq.enqueue(sudog{g: gp})
} else if cas.kind == caseSend {
c.sendq.enqueue(sudog{g: gp})
}
}
// 4. 阻塞等待
gopark(nil, nil, waitReasonSelect, traceBlockSelect, 1)
// 5. 被唤醒,找到是哪个 case
// ... 清理其他等待队列,返回结果
}

4.4 select 流程图#

select 执行流程:
├──▶ 打乱 case 顺序(随机公平)
├──▶ 第一遍扫描(不阻塞)
│ │
│ ├── 有可执行的 case ──▶ 执行并返回
│ │
│ └── 没有可执行的 case ──▶ 继续
├──▶ 有 default
│ │
│ ├── 有 ──▶ 执行 default 并返回
│ │
│ └── 无 ──▶ 继续
├──▶ 将 G 加入所有 channel 的等待队列
├──▶ 阻塞等待
└──▶ 被唤醒 ──▶ 清理其他等待队列,返回结果

5. 通道方向与类型安全#

5.1 单向通道#

Go 支持声明单向通道类型,用于类型检查:

// 只发送通道
func sendOnly(ch chan<- int) {
ch <- 42
}
// 只接收通道
func receiveOnly(ch <-chan int) int {
return <-ch
}
// 双向通道可以隐式转换为单向通道
func main() {
ch := make(chan int)
sendOnly(ch) // chan int -> chan<- int
receiveOnly(ch) // chan int -> <-chan int
}

注意:单向通道是类型系统的约束,底层 hchan 结构完全相同。编译器会阻止错误的操作。

5.2 类型安全的好处#

type ChannelConfig struct {
Send chan<- Data // 只能发送
Receive <-chan Data // 只能接收
}
// 编译时检查
func process(ch ChannelConfig) {
// ch.Send <- data // 正确
// <-ch.Send // 编译错误:不能从发送通道接收
// ch.Receive <- data // 编译错误:不能向接收通道发送
// data := <-ch.Receive // 正确
}

6. 常见并发模式#

6.1 Fan-Out(扇出)#

一个生产者,多个消费者:

┌──────────┐ ┌──────────┐
│Producer │────▶│ Channel │
└──────────┘ └────┬─────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Worker 1 │ │Worker 2 │ │Worker 3 │
│Consumer │ │Consumer │ │Consumer │
└─────────┘ └─────────┘ └─────────┘

实现示例:

func fanOut(jobs <-chan Task, workers int) []<-chan Result {
results := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
results[i] = worker(jobs)
}
return results
}
func worker(jobs <-chan Task) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range jobs {
out <- process(job)
}
}()
return out
}

6.2 Pipeline(管道)#

多个处理阶段串联:

Stage 1 Stage 2 Stage 3
┌─────────┐ ┌─────────┐ ┌─────────┐
│Generator│────▶│Transform│────▶│Consumer │
│ │ │ │ │ │
│ chan A │ │ chan B │ │ chan C │
└─────────┘ └─────────┘ └─────────┘

实现示例:

func pipeline() {
// Stage 1: 生成数据
nums := generate(1, 2, 3, 4, 5)
// Stage 2: 平方
squares := square(nums)
// Stage 3: 打印
for s := range squares {
fmt.Println(s)
}
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}

6.3 Scatter-Gather(分发-聚合)#

并行处理后合并结果:

┌──────────┐
│ Scatter │
│ (分发任务)│
└────┬─────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Worker 1 │ │Worker 2 │ │Worker 3 │
│ 处理 │ │ 处理 │ │ 处理 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘
┌──────────┐
│ Gather │
│ (聚合结果)│
└──────────┘

实现示例:

func scatterGather(inputs []Task, workers int) []Result {
// Scatter: 分发任务
tasks := make(chan Task, len(inputs))
for _, input := range inputs {
tasks <- input
}
close(tasks)
// 启动 workers
results := make(chan Result, workers)
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for task := range tasks {
results <- process(task)
}
}()
}
// Gather: 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
var allResults []Result
for r := range results {
allResults = append(allResults, r)
}
return allResults
}

6.4 Worker Pool(工作池)#

固定数量的 worker 处理任务:

任务队列
┌──────────────┐
│ [Job][Job][ ]│
└──────┬───────┘
┌──────────┼──────────┐
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│W1 │ │W2 │ │W3 │
│Pool │ │Pool │ │Pool │
└──┬──┘ └──┬──┘ └──┬──┘
│ │ │
└──────────┼──────────┘
┌──────────────┐
│ 结果队列 │
└──────────────┘

实现示例:

func workerPool(workers int, jobs <-chan Job, results chan<- Result) {
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(id int) {
defer wg.Done()
for job := range jobs {
results <- process(job)
}
}(i)
}
wg.Wait()
close(results)
}

6.5 Context 取消模式#

使用 context 实现超时和取消:

func worker(ctx context.Context, jobs <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return // 取消
case job, ok := <-jobs:
if !ok {
return
}
select {
case out <- process(job):
case <-ctx.Done():
return
}
}
}
}()
return out
}
// 使用示例
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jobs := make(chan Job, 10)
results := worker(ctx, jobs)
// ... 发送任务并处理结果
}

7. Channel 快速路径(Fast Path)#

7.1 非阻塞操作的快速路径#

Go runtime 对 channel 操作实现了快速路径(fast path)优化:在获取 hchan.lock 之前,先通过无锁的原子读取快速判断操作是否必然失败,从而避免加锁开销。

这段优化出现在 chansendchanrecv 的开头:

// src/runtime/chan.go — https://github.com/golang/go/blob/go1.25.0/src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending.
if !block && c.closed == 0 && full(c) {
return false
}
// ...
}

chanrecv 中同样有对应的快速路径:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether
// the channel is closed.
if atomic.Load(&c.closed) == 0 {
return
}
// The channel is irreversibly closed. Re-check whether the channel has any
// pending data to receive, which could have arrived between the empty and
// closed checks above.
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
// ...
}

7.2 快速路径的工作原理#

快速路径的核心思想是:利用 channel 状态的单调性,通过两次无锁读取推断出某个时刻的真实状态

flowchart TD A["非阻塞 send/recv"] --> B{"快速路径检查(无锁)"} B --> C["读取 c.closed"] C --> D{"c.closed != 0?"} D -- "是(已关闭)" --> E["走慢路径(加锁)"] D -- "否(未关闭)" --> F["读取 full()/empty()"] F --> G{"操作可以立即完成?"} G -- "是" --> H["走慢路径(加锁执行)"] G -- "否" --> I["直接返回 false<br/>避免加锁"] style I fill:#4CAF50,color:#fff style E fill:#FF9800,color:#fff style H fill:#FF9800,color:#fff

关键推理过程:

  1. 发送快速路径:先读 c.closed(未关闭),再读 full(c)(满了)。因为已关闭的 channel 不会从「可发送」变为「不可发送」,所以即使两次读取之间 channel 被关闭,也必然存在一个时刻 channel 既未关闭又满了——此时发送必然失败,直接返回 false 是安全的。

  2. 接收快速路径:先读 empty(c)(空了),再读 c.closed(未关闭)。因为空且未关闭的 channel 不可能成功接收,直接返回。

full()empty() 的实现也是无锁的:

// full 报告发送是否会阻塞(channel 是否满了)
func full(c *hchan) bool {
if c.dataqsiz == 0 {
// 无缓冲 channel:检查是否有等待接收者
// 假定指针读取是 relaxed-atomic 的
return c.recvq.first == nil
}
// 有缓冲 channel:检查 qcount
// 假定 uint 读取是 relaxed-atomic 的
return c.qcount == c.dataqsiz
}
// empty 报告接收是否会阻塞(channel 是否空了)
func empty(c *hchan) bool {
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
if c.timer != nil {
c.timer.maybeRunChan(c)
}
return atomic.Loaduint(&c.qcount) == 0
}

7.3 快速路径的适用场景#

快速路径仅对非阻塞操作生效,即 select 中的 case(编译为 selectnbsend/selectnbrecv):

// select 中的非阻塞 send
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, sys.GetCallerPC())
}
// select 中的非阻塞 recv
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
场景快速路径是否生效原因
select + send生效block=false,可走快速路径
select + recv生效block=false,可走快速路径
普通 ch <- x不生效block=true,必须加锁
普通 <-ch不生效block=true,必须加锁

性能影响:在 select 中包含大量 case 时,快速路径可以显著减少锁竞争。每个 case 先做无锁检查,只有可能成功的 case 才会真正加锁,这降低了 selectgo 的开销。

8. reflect.Select:动态通道选择#

8.1 为什么需要 reflect.Select#

Go 的 select 语句要求 case 的数量在编译时确定。但在某些场景下,需要在运行时动态决定监听哪些 channel:

  • 动态合并多个 channel:数量在运行时才确定
  • 通用的多路复用器:如 fan-in 模式,输入 channel 数量可变
  • 反射框架:需要在运行时构建 channel 操作

reflect.Select 正是为了解决这个需求:

// src/reflect/value.go — https://github.com/golang/go/blob/go1.25.0/src/reflect/value.go
func Select(cases []SelectCase) (chosen int, Value, ok bool)

8.2 SelectCase 结构#

type SelectCase struct {
Dir SelectDir // 方向:Send、Recv 或 Default
Chan Value // channel 值(Dir != Default 时使用)
Send Value // 发送值(Dir == Send 时使用)
}
type SelectDir int
const (
SelectSend SelectDir = iota // case Chan <- Send
SelectRecv // case <-Chan:
SelectDefault // default
)

8.3 使用示例:动态 fan-in#

package main
import (
"fmt"
"reflect"
"time"
)
// dynamicFanIn 从任意数量的 channel 中接收数据
func dynamicFanIn(channels ...<-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
// 构建 SelectCase 切片
cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
// 持续接收,直到所有 channel 都关闭
for len(cases) > 0 {
chosen, value, ok := reflect.Select(cases)
if !ok {
// chosen channel 已关闭,从列表中移除
cases = append(cases[:chosen], cases[chosen+1:]...)
continue
}
out <- value.String()
}
}()
return out
}
func main() {
ch1 := make(chan string, 3)
ch2 := make(chan string, 3)
ch3 := make(chan string, 3)
ch1 <- "from-ch1"
ch2 <- "from-ch2"
ch3 <- "from-ch3"
close(ch1)
close(ch2)
close(ch3)
merged := dynamicFanIn(ch1, ch2, ch3)
for msg := range merged {
fmt.Println(msg)
}
}

8.4 reflect.Select 的实现原理#

reflect.Select 内部调用 runtime.selectgo,与编译器生成的 select 语句走相同的运行时路径:

flowchart TD A["reflect.Select(cases)"] --> B["验证 SelectCase 切片"] B --> C["构建 runtime.scase 数组"] C --> D["调用 runtime.selectgo()"] D --> E["selectgo 执行标准流程:<br/>1. 洗牌<br/>2. 轮询<br/>3. 入队等待<br/>4. 阻塞"] E --> F["返回 chosen, value, ok"] style D fill:#4d96ff,color:#fff

关键实现细节:

  1. 参数转换reflect.SelectSelectCase 转换为 runtime.scase 结构,提取 hchan 指针和数据指针
  2. 公平性:与编译期 select 一样,selectgo 使用 Fisher-Yates 洗牌保证公平性
  3. 开销:相比编译期 selectreflect.Select 多了 SelectCase → scase 的转换开销和 reflect.Value 的装箱/拆箱开销

8.5 reflect.Select vs 编译期 select#

特性编译期 selectreflect.Select
case 数量编译时固定运行时动态
类型安全编译时检查运行时检查
性能最优有额外反射开销
适用场景已知 channel 集合动态 channel 集合
公平性保证

最佳实践:如果 channel 数量在编译时已知,优先使用编译期 select;只有在确实需要动态选择时才使用 reflect.Select

9. Channel 与 GC 的交互#

9.1 hchan 的 GC 扫描#

hchan 结构体中包含多个指针字段,GC 需要正确扫描它们以确保存活的 channel 数据不被回收:

type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer // GC 需要扫描(指向缓冲区)
elemsize uint16
closed uint32
timer *timer // GC 需要扫描
elemtype *_type // GC 需要扫描(但实际是持久化数据,不需要 GC 跟踪)
sendx uint
recvx uint
recvq waitq // GC 需要扫描(包含 *sudog 指针)
sendq waitq // GC 需要扫描(包含 *sudog 指针)
bubble *synctestBubble
lock mutex
}

9.2 缓冲区的 GC 处理#

makechan 中对缓冲区的分配策略直接影响 GC 行为:

func makechan(t *chantype, size int) *hchan {
// ...
var c *hchan
switch {
case mem == 0:
// 无缓冲 channel:只分配 hchan 本身
c = (*hchan)(mallocgc(hchanSize, nil, true))
case !elem.Pointers():
// 元素不包含指针:hchan 和 buf 一次性分配
// GC 不需要扫描 buf 中的元素
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针:buf 单独分配
// GC 需要扫描 buf 中的每个元素
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// ...
}
flowchart TD A["makechan(t, size)"] --> B{"元素包含指针?"} B -- "否(如 chan int)" --> C["hchan + buf 一次性分配<br/>GC 不扫描 buf"] B -- "是(如 chan *int)" --> D["hchan 和 buf 分开分配<br/>GC 扫描 buf 中的指针"] subgraph 无指针元素["chan int(无指针元素)"] E1["┌─────────────────┐"] E2["│ hchan │"] E3["│ buf ──────────┐ │"] E4["│ ▼ │"] E5["│ [1][2][3][ ] │"] E6["└─────────────────┘"] E7["单次 mallocgc<br/>GC 扫描到 hchan 即可"] end subgraph 有指针元素["chan *int(有指针元素)"] F1["┌──────────┐ ┌──────────────┐"] F2["│ hchan │ │ buf │"] F3["│ buf ─────┼────▶│ [*p1][*p2] │"] F4["└──────────┘ └──────────────┘"] F5["两次分配<br/>GC 需扫描 buf 中每个指针"] end C --> E1 D --> F1 style C fill:#4CAF50,color:#fff style D fill:#FF9800,color:#fff

关键优化:当 channel 元素不包含指针时(如 chan intchan float64),hchan 和缓冲区一次性分配,且 GC 不需要扫描缓冲区内容。这减少了 GC 的扫描工作量,也减少了堆上的对象数量。

9.3 sudog 与栈写屏障#

channel 操作中一个特殊的 GC 问题是:无缓冲 channel 的发送/接收会直接在两个 goroutine 的栈之间复制数据

// sendDirect: src 在发送者栈上,dst 在接收者栈上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}
// recvDirect: dst 在接收者栈/堆上,src 在发送者栈上
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}

这打破了 GC 的一个基本假设:栈上的写入只由该 goroutine 自己完成。为了正确处理这种情况,Go 使用了批量写屏障typeBitsBulkBarrier):

flowchart LR subgraph G1["G1(发送者)"] A["栈:值 x"] end subgraph G2["G2(接收者)"] B["栈:待填充"] end A -->|"sendDirect<br/>memmove + 写屏障"| B subgraph GC["GC 处理"] C["typeBitsBulkBarrier<br/>确保 GC 看到跨栈指针"] end style GC fill:#FF9800,color:#fff

9.4 sudog 的 GC 可达性#

sudog 结构体代表一个在 channel 上等待的 goroutine。GC 通过以下路径确保 sudog 的可达性:

  1. 通过 goroutine 的 waitingg.waiting 指向当前 goroutine 等待的 sudog 链表
  2. 通过 channel 的 sendq/recvqhchan.sendqhchan.recvq 包含等待的 sudog
flowchart TD subgraph GCRoots["GC 根集"] G1["G(goroutine)"] CH["hchan(channel)"] end G1 -->|"g.waiting"| SD1["sudog"] CH -->|"sendq.first"| SD2["sudog"] CH -->|"recvq.first"| SD3["sudog"] SD1 -->|"elem"| DATA1["栈/堆上的数据"] SD2 -->|"elem"| DATA2["栈/堆上的数据"] SD3 -->|"elem"| DATA3["栈/堆上的数据"] SD1 -->|"g"| G1 style GCRootS fill:#4d96ff,color:#fff

注意sudog.elem 指向的数据可能在 goroutine 的栈上,而不是堆上。栈扫描器(stack scanner)需要正确处理这种情况。当 goroutine 在 channel 上等待时,gp.activeStackChans 标志被设置为 true,告知栈收缩机制此时不能安全地缩小栈,因为其他 goroutine 可能正在通过 sudog.elem 写入该栈。

9.5 channel 与 GC 的性能影响#

场景GC 影响
chan int(无指针元素)缓冲区不扫描,GC 开销低
chan *int(有指针元素)缓冲区每个元素都需扫描
大缓冲区 channel增加 GC 扫描工作量,可能延长 STW 时间
大量阻塞的 sudog增加可达性分析的工作量
无缓冲 channel + 跨栈复制需要写屏障,但无额外堆分配

最佳实践

  1. 优先使用不包含指针的元素类型(如 chan int 而非 chan *int),减少 GC 扫描
  2. 避免过大的缓冲区,大缓冲区会增加 GC 压力
  3. 及时关闭不再使用的 channel,让阻塞的 goroutine 得以释放

10. 性能考量与最佳实践#

10.1 channel 性能特性#

操作类型时间复杂度说明
发送/接收(无竞争)O(1)直接操作缓冲区或 sudog 队列
发送/接收(有锁竞争)O(1)*需要获取 hchan.lock
Select(n 个 case)O(n)需要遍历所有 case
关闭 channelO(m+n)m=发送者数量,n=接收者数量

7.2 最佳实践#

  1. channel 的大小选择

    • 无缓冲:用于同步信号
    • 有缓冲:用于解耦生产消费速率
    • 避免”无限大”的缓冲:make(chan T, 1<<20) 可能导致内存问题
  2. 谁关闭 channel

    • 只有一个发送者:发送者关闭
    • 多个发送者:使用额外信号或 sync.Once
    • 永远不要在接收端关闭
  3. 避免 goroutine 泄漏

// 错误:可能泄漏
func bad() {
ch := make(chan int)
go func() {
ch <- 42 // 如果没有人接收,永远阻塞
}()
// 没有<-ch
}
// 正确:使用 context 或 done channel
func good(ctx context.Context) {
ch := make(chan int, 1)
go func() {
select {
case ch <- 42:
case <-ctx.Done():
}
}()
}
  1. nil channel 的妙用
// nil channel 在 select 中会被忽略
var ch1, ch2 chan int
select {
case v := <-ch1: // ch1 为 nil,不会执行
fmt.Println(v)
case v := <-ch2: // ch2 为 nil,不会执行
fmt.Println(v)
}
// 动态启用/禁用 case
var sendCh chan<- int
if shouldSend {
sendCh = actualChannel // 非nil才会参与select
}

8. 总结#

Go 的并发模型建立在 CSP 理论之上,通过 goroutine 和 channel 提供了简洁而强大的并发编程能力:

概念关键点
goroutine轻量级、2KB 初始栈、由 Go runtime 调度
channelhchan 结构、环形缓冲区、sudog 等待队列、互斥锁保护
select随机公平、多路复用、支持 default 非阻塞
并发模式fan-out、pipeline、scatter-gather、worker pool

理解这些底层原理,能帮助我们:

  • 写出更高效的并发代码
  • 正确诊断和解决并发问题
  • 避免常见的并发陷阱(死锁、泄漏、竞态)

六、常见问题#

Q1:channel 底层是用锁实现的吗?#

是的,hchan 结构体包含一个 mutex。但 Go 做了优化:对于无缓冲 channel 的同步发送/接收,存在快速路径(fastpath)直接交换数据,无需获取锁。

Q2:为什么 channel 会引发 goroutine 泄漏?#

如果 goroutine 阻塞在 channel 操作上(发送到无人接收的 channel,或从无人发送的 channel 接收),且没有其他方式唤醒它,就会永久阻塞。使用 context 取消或 select+default 可避免。

Q3:nil channel 有什么用?#

nil channel 的发送和接收会永久阻塞,在 select 中使用 nil channel 可以动态禁用某个 case。这是 Go 并发编程中的常用技巧。

Q4:channel 和 mutex 该用哪个?#

channel 适合 goroutine 间通信和同步,mutex 适合共享数据的互斥访问。Go 的哲学是”不要通过共享内存来通信,而要通过通信来共享内存”,但实际中两者各有适用场景。

小结#

  • channel 底层是 hchan 结构体,包含环形缓冲区、mutex、发送/接收等待队列
  • 无缓冲 channel 同步操作有快速路径,避免锁开销
  • select 通过 lockorder 和 pollorder 避免死锁和饥饿
  • channel 与 GC 交互:hchan 中的 sendq/recvq 包含 sudog,GC 需要扫描这些指针
  • 正确使用 channel 的关键:避免泄漏(用 context 取消)、选择合适的缓冲大小

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Go 协程与通道:CSP 模型实现
https://blog.souloss.com/posts/golang/go-channel/
作者
Souloss
发布于
2022-07-17
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时