基本概念与启动
goroutine 是 Go 的轻量线程,由 Go 运行时管理 用 go f() 启动:非阻塞,函数在后台并发执行
package main
import (
"fmt"
"time"
)
func say(s string) {
for i:=0;i<3;i++ {
fmt.Println(s, i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
go say("world") // 在后台并发执行
say("hello")
}
解释:主 goroutine 会先执行 say("hello"),同时另一个 goroutine 在打印 world。如果 main 提早退出,后台 goroutine 会被杀掉
自定义使用核心
- 并发:同时有多个任务在处理
- 并行:任务在多个 CPU 核心上同时运行
runtime.GOMAXPROCS(n)控制能并行运行的 OS 线程数
示例:
package main
import (
"fmt"
"runtime"
"time"
)
func busy(id int) {
for i:=0;i<5;i++ {
fmt.Println("worker", id, "step", i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
runtime.GOMAXPROCS(1) // 强制单核执行(观察并发但非并行)
go busy(1)
go busy(2)
time.Sleep(700 * time.Millisecond)
fmt.Println("Goroutines:", runtime.NumGoroutine())
}
解释:即使 GOMAXPROCS=1,也能并发执行(协作式调度),但不会真正并行占用多核
channel 基础
- channel 是 goroutine 间通信的管道
ch := make(chan T)(无缓冲,同步)或make(chan T, N)(缓冲,异步)
无缓冲示例(同步):
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
ch <- 42 // 发送者阻塞直到有人接收
}()
fmt.Println(<-ch) // 接收
}
缓冲示例(异步):
ch := make(chan int, 2)
ch <- 1
ch <- 2
// 发送不会阻塞,除非缓冲满
fmt.Println(<-ch, <-ch)
channel 方向、关闭、range 接收
- 声明只发送或只接收的 channel:
chan<- int或<-chan int close(ch):关闭 channel,接收者可接收到零值并继续读取(以检测结束)- 用
for v := range ch迭代直到 channel 关闭
示例:
package main
import "fmt"
func producer(ch chan<- int) {
for i:=0;i<5;i++ { ch <- i }
close(ch)
}
func main() {
ch := make(chan int)
go producer(ch)
for v := range ch {
fmt.Println("recv", v)
}
}
注意:不要向已关闭的 channel 发送,否则 panic
select:多路复用与超时/默认分支
select让你在多个 channel 操作中等待任一可用- 支持
default(无阻塞)和time.After用于超时
示例(超时):
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(200 * time.Millisecond):
fmt.Println("timeout")
}
}
解释:这里会打印 timeout,因为接收比超时慢
WaitGroup:等待一组 goroutine 完成
sync.WaitGroup用于等待多个 goroutine 完成,常见模式:wg.Add(n)→ 每个 goroutine 执行defer wg.Done()→wg.Wait()
示例:
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("work", id)
}
func main() {
var wg sync.WaitGroup
for i:=0;i<3;i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("all done")
}
Mutex / RWMutex:互斥与读写锁
sync.Mutex:互斥锁,保护共享数据sync.RWMutex:读写锁,允许并发读,但写互斥
示例(Mutex):
var (
counter int
mu sync.Mutex
)
func inc() {
mu.Lock()
counter++
mu.Unlock()
}
示例(RWMutex):
var (
data map[string]int
rw sync.RWMutex
)
func read(k string) int {
rw.RLock()
v := data[k]
rw.RUnlock()
return v
}
func write(k string, v int) {
rw.Lock()
data[k] = v
rw.Unlock()
}
原子操作
- 对于简单的整型计数,
sync/atomic提供比 Mutex 更快的原子操作(无锁) - 常用:
atomic.AddInt64,atomic.LoadInt64,atomic.CompareAndSwapInt64
示例:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main(){
var counter int64
var wg sync.WaitGroup
for i:=0;i<1000;i++ {
wg.Add(1)
go func(){ atomic.AddInt64(&counter, 1); wg.Done() }()
}
wg.Wait()
fmt.Println("counter", atomic.LoadInt64(&counter))
}
注意:原子适合简单场景,复杂结构仍需 Mutex
管道化
- Pipelining:把处理分成多个 stage,通过 channel 串联起来
- Fan-out:多个 worker 接收同一输入 channel 并发工作
- Fan-in:多个 worker 的输出合并回一个 channel
示例(简单 pipeline + fan-out/fan-in):
package main
import (
"fmt"
"sync"
)
func gen(nums ...int) <-chan int {
out := make(chan int)
go func(){
for _, n := range nums { out <- n }
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func(){
for n := range in { out <- n*n }
close(out)
}()
return out
}
func main() {
in := gen(2,3,4)
out := sq(in)
for v := range out { fmt.Println(v) } // 4,9,16
}
示例:
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
results <- j * 2
}
}
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
var wg sync.WaitGroup
for w:=1;w<=3;w++ { wg.Add(1); go worker(w,jobs,results,&wg) }
for j:=1;j<=5;j++ { jobs <- j }
close(jobs)
go func(){ wg.Wait(); close(results) }()
for r := range results { fmt.Println(r) }
}
goroutine 泄漏与预防
- 泄漏场景:goroutine 在等待无法返回的 channel 或永远阻塞
- 预防:使用
context+ timeout、确保 channel 最终会被关闭、select 带超时或取消分支
常见泄漏示例(错误):
func f(ch <-chan int) {
<-ch // 如果 ch 永远不关闭,这个 goroutine 永久阻塞
}
改进(带取消):
func f(ctx context.Context, ch <-chan int) {
select {
case <-ch:
case <-ctx.Done():
}
}
runtime 包相关
runtime.Gosched():让出当前线程,调度器去运行其他 goroutine(相当于短暂停)runtime.NumGoroutine():返回当前 goroutine 数runtime.GOMAXPROCS(n):设置可并行的操作系统线程数runtime.Stack/pprof可用于获取 goroutine 堆栈
示例:
import "runtime"
fmt.Println("goroutines:", runtime.NumGoroutine())
设计与最佳实践总结
- 尽量避免共享可变状态:优先通过 channel 传递数据(消息传递优于共享内存)
- 用 context 传播取消/超时:每个后台 goroutine 都考虑可取消性
- 短小单一职责 goroutine:一个 goroutine 做一件事(易理解、易管理)
- 避免长时间阻塞在外部资源:用超时、重试、断路器模式保护
- 及时关闭 channel:producer 负责 close,consumer 检测结束
- 使用 WaitGroup 管理生命周期:主流程等待 worker 结束
- 加入日志与指标(metrics):记录 goroutine 启停、错误、处理速率
- 使用 race detector 做开发期检查,并用 pprof 做性能分析