mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1347 字
4 分钟
Go 并发模式实战
2022-08-31

1. 并发模式概述#

1.1 为什么需要并发模式?#

Go 语言的并发原语(goroutine、channel、select)虽然强大,但直接使用它们构建复杂系统容易出错。常见的并发问题包括:

  • goroutine 泄漏:goroutine 永久阻塞,无法退出
  • 资源竞争:多个 goroutine 并发访问共享资源
  • 错误传播:并发任务的错误如何向上传递
  • 取消与超时:如何优雅地取消正在进行的任务

并发模式是对这些问题的标准化解决方案,它们提供了可复用的设计模板。

1.2 模式分类#

并发模式分类
├── 数据流模式
│ ├── Pipeline(流水线)
│ ├── Fan-out/Fan-in(扇出/扇入)
│ └── Scatter-Gather(分发-聚合)
├── 资源管理模式
│ ├── Worker Pool(工作池)
│ ├── Semaphore(信号量)
│ └── Bounded Parallelism(有界并行)
├── 控制模式
│ ├── Context Cancellation(上下文取消)
│ ├── Timeout(超时控制)
│ └── Graceful Shutdown(优雅关闭)
└── 错误处理模式
├── errgroup(错误组)
├── First Error Wins(首个错误胜出)
└── Error Aggregation(错误聚合)

2. Pipeline 模式#

2.1 模式定义#

Pipeline 模式将复杂的数据处理流程分解为多个阶段(Stage),每个阶段通过 channel 连接,数据像流水线一样依次流过各个阶段。

graph LR SRC["数据源"] --> S1["Stage 1<br/>生成器"] S1 -->|"chan int"| S2["Stage 2<br/>转换器"] S2 -->|"chan int"| S3["Stage 3<br/>过滤器"] S3 -->|"chan int"| S4["Stage 4<br/>消费者"] S4 --> OUT["结果"] style SRC fill:#2196F3,color:#fff style S1 fill:#4CAF50,color:#fff style S2 fill:#FF9800,color:#fff style S3 fill:#9C27B0,color:#fff style S4 fill:#F44336,color:#fff style OUT fill:#2196F3,color:#fff

2.2 基础实现#

// Stage 1: 生成器——产生数据源
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: 平方运算——转换数据
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Stage 3: 过滤——筛选符合条件的值
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// 使用示例
func main() {
// 构建流水线
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := square(nums)
evens := filter(squares, func(n int) bool { return n > 25 })
// 消费结果
for n := range evens {
fmt.Println(n)
}
}

2.3 可取消的 Pipeline#

实际场景中,Pipeline 需要支持取消操作:

func generateContext(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return // 取消时退出
}
}
}()
return out
}
func squareContext(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
select {
case out <- n * n:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}

2.4 Pipeline 的优缺点#

优点缺点
阶段解耦,易于测试每个 Stage 创建 goroutine
可组合性强channel 开销
天然支持流式处理背压处理复杂
便于并行化(结合 Fan-out)错误处理需要额外设计

3. Fan-out/Fan-in 模式#

3.1 模式定义#

  • Fan-out:多个 goroutine 从同一个 channel 读取数据,并行处理
  • Fan-in:多个 goroutine 的结果合并到一个 channel
graph LR subgraph "Fan-out(扇出)" P["Producer"] -->|"chan Task"| W1["Worker 1"] P -->|"chan Task"| W2["Worker 2"] P -->|"chan Task"| W3["Worker 3"] end subgraph "Fan-in(扇入)" W1 -->|"chan Result"| M["Merger"] W2 -->|"chan Result"| M W3 -->|"chan Result"| M end M --> OUT["结果"] style P fill:#2196F3,color:#fff style W1 fill:#4CAF50,color:#fff style W2 fill:#4CAF50,color:#fff style W3 fill:#4CAF50,color:#fff style M fill:#FF9800,color:#fff style OUT fill:#9C27B0,color:#fff

3.2 Fan-out 实现#

// 并行执行多个 worker
func fanOut(worker func(<-chan Task) <-chan Result, jobs <-chan Task, workers int) []<-chan Result {
results := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
results[i] = worker(jobs)
}
return results
}
func worker(jobs <-chan Task) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range jobs {
out <- process(job)
}
}()
return out
}

3.3 Fan-in 实现#

Fan-in 的核心依赖 sync.WaitGroup 来同步多个 goroutine 的完成状态。

// 合并多个 channel 的结果
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
wg.Add(len(channels))
// 为每个输入 channel 启动一个 goroutine
for _, ch := range channels {
go func(c <-chan Result) {
defer wg.Done()
for result := range c {
select {
case out <- result:
case <-ctx.Done():
return
}
}
}(ch)
}
// 等待所有 channel 关闭后关闭输出
go func() {
wg.Wait()
close(out)
}()
return out
}

3.4 完整示例:并行 URL 抓取#

func fetchURLs(urls []string) ([]Page, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 创建任务 channel
jobs := make(chan string, len(urls))
for _, url := range urls {
jobs <- url
}
close(jobs)
// Fan-out:启动多个 worker 并行抓取
numWorkers := min(5, len(urls))
results := fanOut(fetchWorker, jobs, numWorkers)
// Fan-in:合并结果
merged := fanIn(ctx, results...)
// 收集结果
var pages []Page
for result := range merged {
if result.Err != nil {
return nil, result.Err
}
pages = append(pages, result.Page)
}
return pages, nil
}
func fetchWorker(jobs <-chan string) <-chan FetchResult {
out := make(chan FetchResult)
go func() {
defer close(out)
client := http.Client{Timeout: 10 * time.Second}
for url := range jobs {
resp, err := client.Get(url)
if err != nil {
out <- FetchResult{Err: err}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
out <- FetchResult{Page: Page{URL: url, Content: string(body)}}
}
}()
return out
}

4. Worker Pool 模式#

4.1 模式定义#

Worker Pool 维护一组固定数量的 worker goroutine,从任务队列中获取任务执行。它限制了并发数量,避免资源耗尽。

graph TD IN["任务输入"] --> TQ["任务队列<br/>chan Task"] TQ -->|"分发"| W1["Worker 1<br/>goroutine"] TQ -->|"分发"| W2["Worker 2<br/>goroutine"] TQ -->|"分发"| W3["Worker N<br/>goroutine"] W1 --> RQ["结果队列<br/>chan Result"] W2 --> RQ W3 --> RQ RQ --> OUT["结果输出"] style IN fill:#2196F3,color:#fff style TQ fill:#FF9800,color:#fff style W1 fill:#4CAF50,color:#fff style W2 fill:#4CAF50,color:#fff style W3 fill:#4CAF50,color:#fff style RQ fill:#FF9800,color:#fff style OUT fill:#9C27B0,color:#fff

4.2 基础实现#

type WorkerPool struct {
tasks chan Task
results chan Result
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers, taskCapacity int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, taskCapacity),
results: make(chan Result, taskCapacity),
workers: workers,
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
for task := range p.tasks {
result := process(task)
p.results <- result
}
}
func (p *WorkerPool) Submit(task Task) {
p.tasks <- task
}
func (p *WorkerPool) Stop() {
close(p.tasks)
p.wg.Wait()
close(p.results)
}
func (p *WorkerPool) Results() <-chan Result {
return p.results
}

4.3 支持取消的 Worker Pool#

type CancellableWorkerPool struct {
tasks chan Task
results chan Result
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewCancellableWorkerPool(ctx context.Context, workers, capacity int) *CancellableWorkerPool {
childCtx, cancel := context.WithCancel(ctx)
return &CancellableWorkerPool{
tasks: make(chan Task, capacity),
results: make(chan Result, capacity),
workers: workers,
ctx: childCtx,
cancel: cancel,
}
}
func (p *CancellableWorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *CancellableWorkerPool) worker(id int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return
}
result := p.processWithCancel(task)
select {
case p.results <- result:
case <-p.ctx.Done():
return
}
case <-p.ctx.Done():
return
}
}
}
func (p *CancellableWorkerPool) processWithCancel(task Task) Result {
// 在处理过程中也检查取消信号
done := make(chan Result, 1)
go func() {
done <- process(task)
}()
select {
case result := <-done:
return result
case <-p.ctx.Done():
return Result{Err: p.ctx.Err()}
}
}
func (p *CancellableWorkerPool) Cancel() {
p.cancel()
}
func (p *CancellableWorkerPool) Stop() {
close(p.tasks)
p.wg.Wait()
close(p.results)
}

4.4 Worker Pool vs 无限制 Goroutine#

// 危险:无限制创建 goroutine
func dangerous(urls []string) {
for _, url := range urls {
go fetch(url) // 可能创建数千个 goroutine
}
}
// 安全:使用 Worker Pool 限制并发
func safe(urls []string, maxWorkers int) {
pool := NewWorkerPool(maxWorkers, len(urls))
pool.Start()
for _, url := range urls {
pool.Submit(Task{URL: url})
}
go func() {
pool.Stop()
}()
for result := range pool.Results() {
// 处理结果
}
}

5. Context 上下文传递与取消#

5.1 Context 接口#

type Context interface {
// 返回 context 被取消的截止时间
Deadline() (deadline time.Time, ok bool)
// 返回一个 channel,当 context 被取消时关闭
Done() <-chan struct{}
// 返回取消原因
Err() error
// 获取与 context 关联的值
Value(key any) any
}

5.2 Context 树形结构#

graph TD BG["context.Background()"] --> VT["WithValue<br/>parent, k, v"] VT --> TO["WithTimeout<br/>3s"] VT --> CA["WithCancel"] VT --> DL["WithDeadline<br/>deadline"] TO --> T1["子任务 1"] CA --> T2["子任务 2"] DL --> T3["子任务 3"] CA -.->|"cancel() 传播"| T2 CA -.->|"cancel() 传播"| T1 CA -.->|"cancel() 传播"| T3 style BG fill:#2196F3,color:#fff style VT fill:#607D8B,color:#fff style TO fill:#FF9800,color:#fff style CA fill:#F44336,color:#fff style DL fill:#9C27B0,color:#fff style T1 fill:#4CAF50,color:#fff style T2 fill:#4CAF50,color:#fff style T3 fill:#4CAF50,color:#fff

当父 Context 取消时,所有子 Context 都会被取消

5.3 Context 取消传播#

func operation(ctx context.Context) error {
// 启动子任务
ctx, cancel := context.WithCancel(ctx)
defer cancel() // 确保资源释放
// 启动多个子任务
results := make(chan Result, 3)
for i := 0; i < 3; i++ {
go func(id int) {
results <- subOperation(ctx, id)
}(i)
}
// 等待第一个结果或取消
select {
case result := <-results:
cancel() // 取消其他子任务
return result.Err
case <-ctx.Done():
return ctx.Err()
}
}
func subOperation(ctx context.Context, id int) Result {
// 定期检查取消信号
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
return Result{Err: ctx.Err()}
default:
// 执行工作
time.Sleep(10 * time.Millisecond)
}
}
return Result{Value: id}
}

5.4 Context 使用原则#

// 正确:将 Context 作为第一个参数
func DoSomething(ctx context.Context, arg Arg) error {
// ...
}
// 错误:将 Context 放在其他位置
func DoSomething(arg Arg, ctx context.Context) error {
// ...
}
// 正确:不要将 Context 存储在结构体中(少数例外:HTTP Handler)
type Handler struct {
// ctx 不应该作为字段
}
// 正确:context.Background() 作为根 Context
ctx := context.Background()
// 正确:传递取消原因
func process(ctx context.Context) error {
ctx, cancel := context.WithCancelCause(ctx)
go func() {
if err := checkSomething(); err != nil {
cancel(fmt.Errorf("check failed: %w", err))
}
}()
<-ctx.Done()
return context.Cause(ctx) // 获取具体的取消原因
}

6. 超时控制#

6.1 使用 context.WithTimeout#

func fetchWithTimeout(ctx context.Context, url string) (*Response, error) {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("request timed out: %w", err)
}
return nil, err
}
return resp, nil
}

6.2 使用 time.After 实现简单超时#

func simpleTimeout(timeout time.Duration) error {
result := make(chan error, 1)
go func() {
result <- doWork()
}()
select {
case err := <-result:
return err
case <-time.After(timeout):
return fmt.Errorf("operation timed out after %v", timeout)
}
}

6.3 超时与取消的区别#

超时控制流程:
开始 ────▶ 执行任务 ────▶ 完成
│ (超过指定时间)
└───────▶ 返回 DeadlineExceeded 错误
取消控制流程:
开始 ────▶ 执行任务 ────▶ 完成
│ (调用 cancel())
└───────▶ 返回 Canceled 错误

6.4 多层超时控制#

func multiLayerTimeout() error {
// 外层:总超时 30 秒
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 阶段1:数据库查询,超时 10 秒
dbCtx, dbCancel := context.WithTimeout(ctx, 10*time.Second)
data, err := queryDB(dbCtx)
dbCancel()
if err != nil {
return fmt.Errorf("database query: %w", err)
}
// 阶段2:外部 API 调用,超时 15 秒
apiCtx, apiCancel := context.WithTimeout(ctx, 15*time.Second)
result, err := callAPI(apiCtx, data)
apiCancel()
if err != nil {
return fmt.Errorf("api call: %w", err)
}
return nil
}

7. errgroup 并发错误处理#

7.1 golang.org/x/sync/errgroup#

errgroup 提供了并发任务组的管理,当任一任务出错时,其他任务会被取消。

import "golang.org/x/sync/errgroup"
func fetchAll(urls []string) ([]Page, error) {
g, ctx := errgroup.WithContext(context.Background())
pages := make([]Page, len(urls))
for i, url := range urls {
i, url := i, url // 捕获循环变量
g.Go(func() error {
// 如果其他 goroutine 出错,ctx 会被取消
page, err := fetchPage(ctx, url)
if err != nil {
return err
}
pages[i] = page
return nil
})
}
// 等待所有任务完成,返回第一个错误(如果有)
if err := g.Wait(); err != nil {
return nil, err
}
return pages, nil
}

7.2 errgroup 工作原理#

errgroup 执行流程:
┌─────────────────────────────────────────────────────┐
│ errgroup │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Task 1 │ │ Task 2 │ │ Task 3 │ │
│ │ (成功) │ │ (出错) │ │ (运行中)│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ │ ┌───▼───┐ │ │
│ │ │ error │◀────────┘ │
│ │ └───┬───┘ ctx 被取消 │
│ │ │ │
│ └─────────────┼─────────────────┐ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ g.Wait() │◀─────────┘ │
│ │ 返回 error │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────┘

7.3 带并发限制的 errgroup#

func fetchWithLimit(urls []string, maxConcurrent int) ([]Page, error) {
g, ctx := errgroup.WithContext(context.Background())
// 使用信号量限制并发数
sem := make(chan struct{}, maxConcurrent)
pages := make([]Page, len(urls))
for i, url := range urls {
i, url := i, url
sem <- struct{}{} // 获取信号量
g.Go(func() error {
defer func() { <-sem }() // 释放信号量
page, err := fetchPage(ctx, url)
if err != nil {
return err
}
pages[i] = page
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return pages, nil
}

7.4 errgroup vs sync.WaitGroup#

特性sync.WaitGrouperrgroup
错误处理需要手动实现自动收集首个错误
取消机制自动取消其他任务
使用复杂度较高较低
Context 集成需要手动集成原生支持
适用场景无需错误处理需要错误处理和取消

8. 资源泄漏防护#

8.1 goroutine 泄漏检测#

使用 runtime 监控 goroutine 数量:

func monitorGoroutines() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
count := runtime.NumGoroutine()
log.Printf("current goroutines: %d", count)
if count > 100 {
log.Printf("WARNING: too many goroutines!")
}
}
}

8.2 常见泄漏场景及修复#

// 场景1:无缓冲 channel 阻塞
func leak1() {
ch := make(chan int)
go func() {
ch <- 42 // 没有接收者,永久阻塞
}()
}
// 修复:使用带缓冲 channel 或确保有接收者
func fixed1() {
ch := make(chan int, 1)
go func() {
ch <- 42
}()
<-ch
}
// 场景2:select 缺少退出条件
func leak2() {
ch := make(chan int)
go func() {
for {
select {
case v := <-ch:
fmt.Println(v)
// 缺少退出条件
}
}
}()
}
// 修复:添加 context 取消
func fixed2(ctx context.Context) {
ch := make(chan int)
go func() {
for {
select {
case v := <-ch:
fmt.Println(v)
case <-ctx.Done():
return
}
}
}()
}
// 场景3:阻塞在 nil channel
func leak3() {
var ch chan int // nil channel
go func() {
<-ch // 永久阻塞
}()
}
// 修复:确保 channel 已初始化
func fixed3() {
ch := make(chan int)
close(ch)
go func() {
<-ch // 接收零值后退出
}()
}

8.3 使用 pprof 诊断泄漏#

import _ "net/http/pprof"
func main() {
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
// 访问 http://localhost:6060/debug/pprof/goroutine?debug=1
// 查看 goroutine 堆栈
}

9. 常见并发陷阱#

9.1 闭包捕获循环变量#

// 错误:所有 goroutine 捕获同一个变量
func wrong() {
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // 可能输出 3, 3, 3
}()
}
}
// 正确:创建局部变量副本
func right() {
for i := 0; i < 3; i++ {
i := i // 捕获当前值
go func() {
fmt.Println(i) // 输出 0, 1, 2
}()
}
}
// 正确:作为参数传递
func alsoRight() {
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n)
}(i)
}
}

9.2 向已关闭的 channel 发送#

// 错误:向已关闭 channel 发送会 panic
func wrong() {
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel
}
// 正确:使用 defer-recover 或确保只有一个关闭者
func right() {
ch := make(chan int, 1)
// 使用 sync.Once 确保只关闭一次
var once sync.Once
once.Do(func() { close(ch) })
}

9.3 关闭 nil channel#

// 错误:关闭 nil channel 会 panic
func wrong() {
var ch chan int
close(ch) // panic: close of nil channel
}
// 正确:检查 channel 是否为 nil
func right(ch chan int) {
if ch != nil {
close(ch)
}
}

9.4 死锁:循环等待#

// 死锁场景
func deadlock() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
<-ch1
ch2 <- 1
}()
<-ch2 // 等待 ch2,但 ch2 需要 ch1 先被发送
ch1 <- 1 // 永远无法到达
}
// 修复:使用缓冲 channel 或调整顺序
func fixed() {
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
go func() {
<-ch1
ch2 <- 1
}()
ch1 <- 1
<-ch2
}

9.5 竞态条件#

// 竞态条件
func race() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!
}()
}
wg.Wait()
fmt.Println(counter) // 结果不确定
}
// 使用互斥锁
func withMutex() {
var counter int
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
}
// 使用 atomic
func withAtomic() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
}
// 使用 channel
func withChannel() {
counter := make(chan int, 1)
counter <- 0 // 初始化
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c := <-counter
counter <- c + 1
}()
}
wg.Wait()
final := <-counter
fmt.Println(final)
}

10. 总结#

10.1 模式选择指南#

场景 ──▶ 推荐模式
数据需要多阶段处理 ──▶ Pipeline
多任务并行执行 ──▶ Fan-out/Fan-in
限制并发数量 ──▶ Worker Pool
需要取消/超时控制 ──▶ Context
并发任务需要错误处理 ──▶ errgroup

10.2 并发安全检查清单#

  • 所有 goroutine 都有退出路径
  • 使用 context 实现取消传播
  • 避免向已关闭的 channel 发送
  • 共享资源使用互斥锁或 atomic
  • 使用 go run -race 检测竞态
  • 监控 goroutine 数量,防止泄漏
  • 正确处理并发任务的错误

10.3 工具推荐#

工具用途
go run -race竞态条件检测
net/http/pprofgoroutine 分析
golang.org/x/sync/errgroup并发错误处理
golang.org/x/tools/gopls静态分析

八、常见问题#

Q1:Pipeline 模式中如何处理错误?#

每个阶段遇到错误时通过专门的 error channel 传递,或使用 errgroup 管理所有 goroutine 的错误。推荐 errgroup,它会在任一 goroutine 出错时取消所有其他 goroutine。

Q2:Worker Pool 的 worker 数量怎么确定?#

通常设为 CPU 核心数(runtime.NumCPU())对于 CPU 密集型任务,或 2-10 倍核心数对于 IO 密集型任务。实际值应根据基准测试调整。

Q3:Fan-out 和 Worker Pool 有什么区别?#

Fan-out 是将同一输入分发给多个消费者并行处理(广播),Worker Pool 是多个 worker 从同一 channel 消费任务(竞争)。Fan-out 适合相同数据的不同处理,Worker Pool 适合不同数据的相同处理。

Q4:errgroup 和 WaitGroup 怎么选?#

errgroupWaitGroup 基础上增加了错误收集和 context 取消功能。如果需要知道哪个 goroutine 出错或需要取消,用 errgroup;如果只需等待完成,用 WaitGroup。

小结#

  • Pipeline 模式将处理流程分解为多个阶段,通过 channel 连接,实现流式处理
  • Fan-out/Fan-in 模式实现并行分发和结果汇聚,提高吞吐量
  • Worker Pool 限制并发 goroutine 数量,避免资源耗尽
  • Context 用于取消和超时控制,是 Go 并发编程的基础设施
  • errgroup 简化并发错误管理,支持错误收集和自动取消

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Go 并发模式实战
https://blog.souloss.com/posts/golang/go-concurrency/
作者
Souloss
发布于
2022-08-31
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时