1. 并发模式概述
1.1 为什么需要并发模式?
Go 语言的并发原语(goroutine、channel、select)虽然强大,但直接使用它们构建复杂系统容易出错。常见的并发问题包括:
- goroutine 泄漏:goroutine 永久阻塞,无法退出
- 资源竞争:多个 goroutine 并发访问共享资源
- 错误传播:并发任务的错误如何向上传递
- 取消与超时:如何优雅地取消正在进行的任务
并发模式是对这些问题的标准化解决方案,它们提供了可复用的设计模板。
1.2 模式分类
并发模式分类│├── 数据流模式│ ├── Pipeline(流水线)│ ├── Fan-out/Fan-in(扇出/扇入)│ └── Scatter-Gather(分发-聚合)│├── 资源管理模式│ ├── Worker Pool(工作池)│ ├── Semaphore(信号量)│ └── Bounded Parallelism(有界并行)│├── 控制模式│ ├── Context Cancellation(上下文取消)│ ├── Timeout(超时控制)│ └── Graceful Shutdown(优雅关闭)│└── 错误处理模式 ├── errgroup(错误组) ├── First Error Wins(首个错误胜出) └── Error Aggregation(错误聚合)2. Pipeline 模式
2.1 模式定义
Pipeline 模式将复杂的数据处理流程分解为多个阶段(Stage),每个阶段通过 channel 连接,数据像流水线一样依次流过各个阶段。
2.2 基础实现
// Stage 1: 生成器——产生数据源func generate(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out}
// Stage 2: 平方运算——转换数据func square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out}
// Stage 3: 过滤——筛选符合条件的值func filter(in <-chan int, predicate func(int) bool) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if predicate(n) { out <- n } } }() return out}
// 使用示例func main() { // 构建流水线 nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) squares := square(nums) evens := filter(squares, func(n int) bool { return n > 25 })
// 消费结果 for n := range evens { fmt.Println(n) }}2.3 可取消的 Pipeline
实际场景中,Pipeline 需要支持取消操作:
func generateContext(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-ctx.Done(): return // 取消时退出 } } }() return out}
func squareContext(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for { select { case n, ok := <-in: if !ok { return } select { case out <- n * n: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() return out}2.4 Pipeline 的优缺点
| 优点 | 缺点 |
|---|---|
| 阶段解耦,易于测试 | 每个 Stage 创建 goroutine |
| 可组合性强 | channel 开销 |
| 天然支持流式处理 | 背压处理复杂 |
| 便于并行化(结合 Fan-out) | 错误处理需要额外设计 |
3. Fan-out/Fan-in 模式
3.1 模式定义
- Fan-out:多个 goroutine 从同一个 channel 读取数据,并行处理
- Fan-in:多个 goroutine 的结果合并到一个 channel
3.2 Fan-out 实现
// 并行执行多个 workerfunc fanOut(worker func(<-chan Task) <-chan Result, jobs <-chan Task, workers int) []<-chan Result { results := make([]<-chan Result, workers) for i := 0; i < workers; i++ { results[i] = worker(jobs) } return results}
func worker(jobs <-chan Task) <-chan Result { out := make(chan Result) go func() { defer close(out) for job := range jobs { out <- process(job) } }() return out}3.3 Fan-in 实现
Fan-in 的核心依赖 sync.WaitGroup 来同步多个 goroutine 的完成状态。
// 合并多个 channel 的结果func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result { out := make(chan Result)
var wg sync.WaitGroup wg.Add(len(channels))
// 为每个输入 channel 启动一个 goroutine for _, ch := range channels { go func(c <-chan Result) { defer wg.Done() for result := range c { select { case out <- result: case <-ctx.Done(): return } } }(ch) }
// 等待所有 channel 关闭后关闭输出 go func() { wg.Wait() close(out) }()
return out}3.4 完整示例:并行 URL 抓取
func fetchURLs(urls []string) ([]Page, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
// 创建任务 channel jobs := make(chan string, len(urls)) for _, url := range urls { jobs <- url } close(jobs)
// Fan-out:启动多个 worker 并行抓取 numWorkers := min(5, len(urls)) results := fanOut(fetchWorker, jobs, numWorkers)
// Fan-in:合并结果 merged := fanIn(ctx, results...)
// 收集结果 var pages []Page for result := range merged { if result.Err != nil { return nil, result.Err } pages = append(pages, result.Page) }
return pages, nil}
func fetchWorker(jobs <-chan string) <-chan FetchResult { out := make(chan FetchResult) go func() { defer close(out) client := http.Client{Timeout: 10 * time.Second} for url := range jobs { resp, err := client.Get(url) if err != nil { out <- FetchResult{Err: err} continue } body, _ := io.ReadAll(resp.Body) resp.Body.Close() out <- FetchResult{Page: Page{URL: url, Content: string(body)}} } }() return out}4. Worker Pool 模式
4.1 模式定义
Worker Pool 维护一组固定数量的 worker goroutine,从任务队列中获取任务执行。它限制了并发数量,避免资源耗尽。
4.2 基础实现
type WorkerPool struct { tasks chan Task results chan Result workers int wg sync.WaitGroup}
func NewWorkerPool(workers, taskCapacity int) *WorkerPool { return &WorkerPool{ tasks: make(chan Task, taskCapacity), results: make(chan Result, taskCapacity), workers: workers, }}
func (p *WorkerPool) Start() { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker(i) }}
func (p *WorkerPool) worker(id int) { defer p.wg.Done() for task := range p.tasks { result := process(task) p.results <- result }}
func (p *WorkerPool) Submit(task Task) { p.tasks <- task}
func (p *WorkerPool) Stop() { close(p.tasks) p.wg.Wait() close(p.results)}
func (p *WorkerPool) Results() <-chan Result { return p.results}4.3 支持取消的 Worker Pool
type CancellableWorkerPool struct { tasks chan Task results chan Result workers int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup}
func NewCancellableWorkerPool(ctx context.Context, workers, capacity int) *CancellableWorkerPool { childCtx, cancel := context.WithCancel(ctx) return &CancellableWorkerPool{ tasks: make(chan Task, capacity), results: make(chan Result, capacity), workers: workers, ctx: childCtx, cancel: cancel, }}
func (p *CancellableWorkerPool) Start() { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker(i) }}
func (p *CancellableWorkerPool) worker(id int) { defer p.wg.Done() for { select { case task, ok := <-p.tasks: if !ok { return } result := p.processWithCancel(task) select { case p.results <- result: case <-p.ctx.Done(): return } case <-p.ctx.Done(): return } }}
func (p *CancellableWorkerPool) processWithCancel(task Task) Result { // 在处理过程中也检查取消信号 done := make(chan Result, 1) go func() { done <- process(task) }()
select { case result := <-done: return result case <-p.ctx.Done(): return Result{Err: p.ctx.Err()} }}
func (p *CancellableWorkerPool) Cancel() { p.cancel()}
func (p *CancellableWorkerPool) Stop() { close(p.tasks) p.wg.Wait() close(p.results)}4.4 Worker Pool vs 无限制 Goroutine
// 危险:无限制创建 goroutinefunc dangerous(urls []string) { for _, url := range urls { go fetch(url) // 可能创建数千个 goroutine }}
// 安全:使用 Worker Pool 限制并发func safe(urls []string, maxWorkers int) { pool := NewWorkerPool(maxWorkers, len(urls)) pool.Start()
for _, url := range urls { pool.Submit(Task{URL: url}) }
go func() { pool.Stop() }()
for result := range pool.Results() { // 处理结果 }}5. Context 上下文传递与取消
5.1 Context 接口
type Context interface { // 返回 context 被取消的截止时间 Deadline() (deadline time.Time, ok bool)
// 返回一个 channel,当 context 被取消时关闭 Done() <-chan struct{}
// 返回取消原因 Err() error
// 获取与 context 关联的值 Value(key any) any}5.2 Context 树形结构
当父 Context 取消时,所有子 Context 都会被取消
5.3 Context 取消传播
func operation(ctx context.Context) error { // 启动子任务 ctx, cancel := context.WithCancel(ctx) defer cancel() // 确保资源释放
// 启动多个子任务 results := make(chan Result, 3) for i := 0; i < 3; i++ { go func(id int) { results <- subOperation(ctx, id) }(i) }
// 等待第一个结果或取消 select { case result := <-results: cancel() // 取消其他子任务 return result.Err case <-ctx.Done(): return ctx.Err() }}
func subOperation(ctx context.Context, id int) Result { // 定期检查取消信号 for i := 0; i < 100; i++ { select { case <-ctx.Done(): return Result{Err: ctx.Err()} default: // 执行工作 time.Sleep(10 * time.Millisecond) } } return Result{Value: id}}5.4 Context 使用原则
// 正确:将 Context 作为第一个参数func DoSomething(ctx context.Context, arg Arg) error { // ...}
// 错误:将 Context 放在其他位置func DoSomething(arg Arg, ctx context.Context) error { // ...}
// 正确:不要将 Context 存储在结构体中(少数例外:HTTP Handler)type Handler struct { // ctx 不应该作为字段}
// 正确:context.Background() 作为根 Contextctx := context.Background()
// 正确:传递取消原因func process(ctx context.Context) error { ctx, cancel := context.WithCancelCause(ctx) go func() { if err := checkSomething(); err != nil { cancel(fmt.Errorf("check failed: %w", err)) } }() <-ctx.Done() return context.Cause(ctx) // 获取具体的取消原因}6. 超时控制
6.1 使用 context.WithTimeout
func fetchWithTimeout(ctx context.Context, url string) (*Response, error) { // 创建带超时的 context ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, err }
resp, err := http.DefaultClient.Do(req) if err != nil { if errors.Is(err, context.DeadlineExceeded) { return nil, fmt.Errorf("request timed out: %w", err) } return nil, err }
return resp, nil}6.2 使用 time.After 实现简单超时
func simpleTimeout(timeout time.Duration) error { result := make(chan error, 1)
go func() { result <- doWork() }()
select { case err := <-result: return err case <-time.After(timeout): return fmt.Errorf("operation timed out after %v", timeout) }}6.3 超时与取消的区别
超时控制流程:
开始 ────▶ 执行任务 ────▶ 完成 │ │ (超过指定时间) │ └───────▶ 返回 DeadlineExceeded 错误
取消控制流程:
开始 ────▶ 执行任务 ────▶ 完成 │ │ (调用 cancel()) │ └───────▶ 返回 Canceled 错误6.4 多层超时控制
func multiLayerTimeout() error { // 外层:总超时 30 秒 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
// 阶段1:数据库查询,超时 10 秒 dbCtx, dbCancel := context.WithTimeout(ctx, 10*time.Second) data, err := queryDB(dbCtx) dbCancel() if err != nil { return fmt.Errorf("database query: %w", err) }
// 阶段2:外部 API 调用,超时 15 秒 apiCtx, apiCancel := context.WithTimeout(ctx, 15*time.Second) result, err := callAPI(apiCtx, data) apiCancel() if err != nil { return fmt.Errorf("api call: %w", err) }
return nil}7. errgroup 并发错误处理
7.1 golang.org/x/sync/errgroup
errgroup 提供了并发任务组的管理,当任一任务出错时,其他任务会被取消。
import "golang.org/x/sync/errgroup"
func fetchAll(urls []string) ([]Page, error) { g, ctx := errgroup.WithContext(context.Background()) pages := make([]Page, len(urls))
for i, url := range urls { i, url := i, url // 捕获循环变量 g.Go(func() error { // 如果其他 goroutine 出错,ctx 会被取消 page, err := fetchPage(ctx, url) if err != nil { return err } pages[i] = page return nil }) }
// 等待所有任务完成,返回第一个错误(如果有) if err := g.Wait(); err != nil { return nil, err }
return pages, nil}7.2 errgroup 工作原理
errgroup 执行流程:
┌─────────────────────────────────────────────────────┐│ errgroup ││ ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ Task 1 │ │ Task 2 │ │ Task 3 │ ││ │ (成功) │ │ (出错) │ │ (运行中)│ ││ └────┬────┘ └────┬────┘ └────┬────┘ ││ │ │ │ ││ │ ┌───▼───┐ │ ││ │ │ error │◀────────┘ ││ │ └───┬───┘ ctx 被取消 ││ │ │ ││ └─────────────┼─────────────────┐ ││ ▼ │ ││ ┌─────────────┐ │ ││ │ g.Wait() │◀─────────┘ ││ │ 返回 error │ ││ └─────────────┘ │└─────────────────────────────────────────────────────┘7.3 带并发限制的 errgroup
func fetchWithLimit(urls []string, maxConcurrent int) ([]Page, error) { g, ctx := errgroup.WithContext(context.Background())
// 使用信号量限制并发数 sem := make(chan struct{}, maxConcurrent) pages := make([]Page, len(urls))
for i, url := range urls { i, url := i, url sem <- struct{}{} // 获取信号量
g.Go(func() error { defer func() { <-sem }() // 释放信号量
page, err := fetchPage(ctx, url) if err != nil { return err } pages[i] = page return nil }) }
if err := g.Wait(); err != nil { return nil, err }
return pages, nil}7.4 errgroup vs sync.WaitGroup
| 特性 | sync.WaitGroup | errgroup |
|---|---|---|
| 错误处理 | 需要手动实现 | 自动收集首个错误 |
| 取消机制 | 无 | 自动取消其他任务 |
| 使用复杂度 | 较高 | 较低 |
| Context 集成 | 需要手动集成 | 原生支持 |
| 适用场景 | 无需错误处理 | 需要错误处理和取消 |
8. 资源泄漏防护
8.1 goroutine 泄漏检测
使用 runtime 监控 goroutine 数量:
func monitorGoroutines() { ticker := time.NewTicker(5 * time.Second) for range ticker.C { count := runtime.NumGoroutine() log.Printf("current goroutines: %d", count) if count > 100 { log.Printf("WARNING: too many goroutines!") } }}8.2 常见泄漏场景及修复
// 场景1:无缓冲 channel 阻塞func leak1() { ch := make(chan int) go func() { ch <- 42 // 没有接收者,永久阻塞 }()}
// 修复:使用带缓冲 channel 或确保有接收者func fixed1() { ch := make(chan int, 1) go func() { ch <- 42 }() <-ch}
// 场景2:select 缺少退出条件func leak2() { ch := make(chan int) go func() { for { select { case v := <-ch: fmt.Println(v) // 缺少退出条件 } } }()}
// 修复:添加 context 取消func fixed2(ctx context.Context) { ch := make(chan int) go func() { for { select { case v := <-ch: fmt.Println(v) case <-ctx.Done(): return } } }()}
// 场景3:阻塞在 nil channelfunc leak3() { var ch chan int // nil channel go func() { <-ch // 永久阻塞 }()}
// 修复:确保 channel 已初始化func fixed3() { ch := make(chan int) close(ch) go func() { <-ch // 接收零值后退出 }()}8.3 使用 pprof 诊断泄漏
import _ "net/http/pprof"
func main() { go func() { http.ListenAndServe("localhost:6060", nil) }() // 访问 http://localhost:6060/debug/pprof/goroutine?debug=1 // 查看 goroutine 堆栈}9. 常见并发陷阱
9.1 闭包捕获循环变量
// 错误:所有 goroutine 捕获同一个变量func wrong() { for i := 0; i < 3; i++ { go func() { fmt.Println(i) // 可能输出 3, 3, 3 }() }}
// 正确:创建局部变量副本func right() { for i := 0; i < 3; i++ { i := i // 捕获当前值 go func() { fmt.Println(i) // 输出 0, 1, 2 }() }}
// 正确:作为参数传递func alsoRight() { for i := 0; i < 3; i++ { go func(n int) { fmt.Println(n) }(i) }}9.2 向已关闭的 channel 发送
// 错误:向已关闭 channel 发送会 panicfunc wrong() { ch := make(chan int) close(ch) ch <- 1 // panic: send on closed channel}
// 正确:使用 defer-recover 或确保只有一个关闭者func right() { ch := make(chan int, 1)
// 使用 sync.Once 确保只关闭一次 var once sync.Once once.Do(func() { close(ch) })}9.3 关闭 nil channel
// 错误:关闭 nil channel 会 panicfunc wrong() { var ch chan int close(ch) // panic: close of nil channel}
// 正确:检查 channel 是否为 nilfunc right(ch chan int) { if ch != nil { close(ch) }}9.4 死锁:循环等待
// 死锁场景func deadlock() { ch1 := make(chan int) ch2 := make(chan int)
go func() { <-ch1 ch2 <- 1 }()
<-ch2 // 等待 ch2,但 ch2 需要 ch1 先被发送 ch1 <- 1 // 永远无法到达}
// 修复:使用缓冲 channel 或调整顺序func fixed() { ch1 := make(chan int, 1) ch2 := make(chan int, 1)
go func() { <-ch1 ch2 <- 1 }()
ch1 <- 1 <-ch2}9.5 竞态条件
// 竞态条件func race() { var counter int var wg sync.WaitGroup
for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter++ // 竞态! }() } wg.Wait() fmt.Println(counter) // 结果不确定}
// 使用互斥锁func withMutex() { var counter int var mu sync.Mutex var wg sync.WaitGroup
for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() mu.Lock() counter++ mu.Unlock() }() } wg.Wait()}
// 使用 atomicfunc withAtomic() { var counter int64 var wg sync.WaitGroup
for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() atomic.AddInt64(&counter, 1) }() } wg.Wait()}
// 使用 channelfunc withChannel() { counter := make(chan int, 1) counter <- 0 // 初始化
var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() c := <-counter counter <- c + 1 }() } wg.Wait() final := <-counter fmt.Println(final)}10. 总结
10.1 模式选择指南
场景 ──▶ 推荐模式
数据需要多阶段处理 ──▶ Pipeline多任务并行执行 ──▶ Fan-out/Fan-in限制并发数量 ──▶ Worker Pool需要取消/超时控制 ──▶ Context并发任务需要错误处理 ──▶ errgroup10.2 并发安全检查清单
- 所有 goroutine 都有退出路径
- 使用 context 实现取消传播
- 避免向已关闭的 channel 发送
- 共享资源使用互斥锁或 atomic
- 使用
go run -race检测竞态 - 监控 goroutine 数量,防止泄漏
- 正确处理并发任务的错误
10.3 工具推荐
| 工具 | 用途 |
|---|---|
go run -race | 竞态条件检测 |
net/http/pprof | goroutine 分析 |
golang.org/x/sync/errgroup | 并发错误处理 |
golang.org/x/tools/gopls | 静态分析 |
八、常见问题
Q1:Pipeline 模式中如何处理错误?
每个阶段遇到错误时通过专门的 error channel 传递,或使用 errgroup 管理所有 goroutine 的错误。推荐 errgroup,它会在任一 goroutine 出错时取消所有其他 goroutine。
Q2:Worker Pool 的 worker 数量怎么确定?
通常设为 CPU 核心数(runtime.NumCPU())对于 CPU 密集型任务,或 2-10 倍核心数对于 IO 密集型任务。实际值应根据基准测试调整。
Q3:Fan-out 和 Worker Pool 有什么区别?
Fan-out 是将同一输入分发给多个消费者并行处理(广播),Worker Pool 是多个 worker 从同一 channel 消费任务(竞争)。Fan-out 适合相同数据的不同处理,Worker Pool 适合不同数据的相同处理。
Q4:errgroup 和 WaitGroup 怎么选?
errgroup 在 WaitGroup 基础上增加了错误收集和 context 取消功能。如果需要知道哪个 goroutine 出错或需要取消,用 errgroup;如果只需等待完成,用 WaitGroup。
小结
- Pipeline 模式将处理流程分解为多个阶段,通过 channel 连接,实现流式处理
- Fan-out/Fan-in 模式实现并行分发和结果汇聚,提高吞吐量
- Worker Pool 限制并发 goroutine 数量,避免资源耗尽
- Context 用于取消和超时控制,是 Go 并发编程的基础设施
- errgroup 简化并发错误管理,支持错误收集和自动取消
参考资料
-
Go Channel 实现 — channel 底层源码
-
Go WaitGroup 实现 — WaitGroup 源码
-
Go Context 实现 — context 包源码
-
Go errgroup 实现 — errgroup 官方文档
-
Go 并发模式 — Go 官方 Pipeline 模式教程
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






