原语简介
Go同步原语主要分为基本原语和扩展原语,一般只需要了解基本原语即可,基本原语由sync
包提供,包括Mutex
, RWMutex
, WaitGroup
, Once
和Cond
; 扩展原语有x/sync
包提供,主要有ErrGroup
, Semaphore
和SingleFlight
Mutex
结构体
Mutex
是普通的互斥锁,由两个字段state
和sema
组成,state
表示互斥锁状态,sema
控制锁状态的信号量
type Mutex struct {
state int32
sema int32
}
状态state
互斥锁状态是int32
表示的,但是锁的状态不是互斥的,最低三位分别表示mutexLocked
, mutexWoken
和mutexStaving
,剩下的位置都用来表示有多少个goroutine等待互斥锁被释放
默认初始化状态(state
)都是0
, 互斥锁被锁定时,mutextLocked
会被置成1
, 互斥锁在正常模式下被唤醒时, mutexWoken
会被置成1
, mutexStaving
表示互斥锁进入了饥饿模式, 稍后会介绍
饥饿模式
go语言的互斥锁在1.9版本之后,分成了普通模式和饥饿模式,主要是为了保证互斥锁获取的公平性, 能够避免goroutine陷入等待无法获取锁而造成较高的延时。
正常模式下,互斥锁的获取是按照FIFO
的顺序进行的,但是如果一个goroutine进程也调用lock
时,那么它大概率是获取不到锁的,为了减少这种情况的出现,就设计了饥饿模式。
饥饿模式下, 互斥锁会直接交给等待队列最前面的goroutine,新的goroutine不能获取到锁,也无法进入自旋, 只能在队列末尾等待
正常模式切饥饿模式:goroutine超过1ms没有获取到锁,会从正常模式切换到饥饿模式
饥饿模式切正常模式:goroutine获得了互斥锁且是队列最末尾的协程或者它的等待时间少于1ms,会从饥饿模式切换会正常模式
加锁
互斥锁Mutex
加锁是由Lock
方法完成的,如果锁的状态是0
,直接将mutexLocked
置位1
, 否则, 进入lockSlow
方法,尝试通过自旋或其他方法等待锁的释放并获取互斥锁。源码
自旋其实在多线程同步的过程中使用的一种机制,当前的进程在进入自旋的过程会一直保持CPU的占用,持续检查某个条件是否为真,在多核CPU上, 自旋的优点避免了goroutine的切换,可以提高性能
进入自旋条件:
运行在多CPU机器上 当前goroutine获取锁进入自旋的次数小于4次 当前机器上至少存在一个正在运行的处理器P并且处理的队列是空的 普通模式下才能进入自旋
当判定能够进入自旋时,会调用runtime_doSpin
,最终的调用是通过汇编语言调用指定次数的PAUSE
指令
处理了自旋相关的逻辑后,互斥锁根据上下文情况计算当前互斥锁的状态,并更新state
的信息,修改成功后,如果是锁定状态或饥饿模式,直接跳过,否则调用runtime_SemacquireMutex
方法, runtime_SemacquireMutex
主要作用就是通过Mutex
的信号量保证资源不会被两个goroutine获取, 会不断调用goparkunlock
将当前goroutine陷入休眠等待信号量可以被获取
一旦获取到信号量,说明互斥锁被解锁,该方法会立刻返回,执行Lock
后面的代码,最后会判断是否退出饥饿模式,如果可以退出饥饿模式,则会切换到正常模式
解锁
互斥锁的解锁就非常简单,Unlock
会直接调用atomic
包提供的AddInt32
,如果返回的新状态不等于0
就会进入unlockSlow
方法,源码
饥饿模式下,会直接调用runtime_Semrelease
方法,将锁交给下一个等待者,等待者会在被唤醒后设置mutexLocked
状态,此时仍然是mutexStarving
状态,所以新的goroutine也无法获取锁
正常模式下, 如果互斥锁不存在等待者或者最低三位都是0, 那么就不需要唤醒其他goroutine,直接返回,当有goroutine处于等待状态时,还是会通过runtime_Semrelease
唤醒对应的goroutime并移交所有权
RWMutex
读写互斥锁只是在写的时候互斥,读的操作是可以并行的
结构体
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
包含一个互斥锁,还有两个信号量(分别用于写等待读和读等待写), readerCount
存储了当前正在执行读操作的数量,readerWait
表示当写操作被阻塞时等待的读操作个数
读锁
加锁读锁时,通过atomic.AddInt32
方法为readerCount
加一,如果返回了负数,则说明有goroutine获得了写锁,当前goroutine会调用runtime_SemacquireMutex
陷入休眠等待唤醒
释放读锁时,会调用RUnlock
方法为readerCount
减一,如果遇到了返回值小于0的情况,则说明有个正在写的操作,这时就调用rUnlockSlow
方法减少当前写操作等待读操作数readerWait
, 并在所有读操作都被释放之后触发写操作的信号量writerSem
读写锁
资源使用者想获取写锁时,需要通过Lock
方法,该方法先调用了互斥锁持有的Mutex
的Lock
方法, 保证其他获取读写锁的goroutine进入等待状态,然后通过调用atomic.AddInt32
阻塞后续读的操作
如果有其它的goroutine持有互斥锁的读锁,该goroutine会调用runtime_SemacquireMutex
进入休眠状态,等待读锁释放时触发writerSem
信号量将当前协程唤醒
对资源读写操作完成之后会通过atomic.AddInt32
变回正数,并通过for
循环触发所有由于获取读锁而陷入等待的goroutine,最后,RWMutex
会释放持有的互斥锁让其他的协程能够重新获取读写锁
WaitGroup
WaitGroup
是sync
包比较常见的同步机制,用于等待一系列的goroutine的返回,常见的场景是批量执行RPC调用
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, req := range requests {
go func(r *Request) {
defer wg.Done()
}(req)
}
wg.Wait()
通过WaitGroup
可以轻松同步信息,加快了程序处理的速度,只有等Wait
方法返回后,程序才会调用其他的逻辑
结构体
type WaitGroup struct {
noCopy noCopy
state1 [3]int32
}
copylock
是一个编译期间检查拷贝的,noCopy
是不允许拷贝的。此外,还包括一个共占用12字节大小的数组,用来存储当前结构体持有的状态和信号量,在64和32位机器上表现不同。WaitGroup
提供了state
私有方法,来提取state1
字段的状态和信号量
操作
WaitGroup
包含三个操作Add
, Done
和Wait
, 其中Done
只是调用Add(-1)
Add
主要作用就是更新WaitGroup
中持有的计数器counter
, 计数器只能是非负数。当调用Add
方法导致计数器归零且还有等待的goroutine时,会通过runtime_Semrelease
唤醒处于等待状态的所有goroutine
Wait
会在当前计数器中保存的数据大于0时,修改等待goroutine的个数waiter
并调用runtime_Semacquire
陷入睡眠状态。陷入睡眠的goroutine会等待Add
方法在计数器为0时唤醒
小结
Add
和Wait
不能在goroutine中并发调用,否则会造成崩溃WaitGroup
必须在Wait
方法返回后才能重新被使用可以同时有多个goroutine等待当前 WaitGroup
计数器归零, 这些goroutine也会被唤醒
Once
sync
提供的Once
主要功能是程序运行期间,某段代码只会执行一次。如下面的代码,只会输出一次
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("once")
})
}
}
结构体
type Once struct {
done uint32
m Mutex
}
结构体只包含一个标识代码段是否执行过的done
和互斥锁Mutex
操作
只有一个操作Do
, 接受一个入参为空的函数,使用atomic.LoadUint32
检查是否执行过,否则会进入doSlow
运行函数。
doSlow
实现也很简单,先为当前goroutine获取互斥锁,然后通过defer
关键字将done
成员变量设置成1
, 并运行函数, 无论函数运行过程中是否panic
,都会把done
设置成1
所以,不管Do
传入了什么参数,函数都只会被执行一次,且不管执行结果如何
Cond
Cond
是一个条件变量,通过Cond
可以让一系列的goroutine在触发某事件或条件时才会被唤醒,比如下面的代码,10个goroutine会通过Wait
等待期望的信号或事件,剩下的一个goroutine会调用Broadcast
通知陷入等待的goroutine, 调用之后,会打印10次"listen"
func main() {
c := &sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<- ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
c.Wait()
fmt.Println("listen")
c.L.Unlock()
}
结构体
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
noCopy
保证编译期间Cond
不会被拷贝checker
保证运行期间发生拷贝直接panic持有的锁 L
是Locker
的一个接口,任意实现Lock
和Unlock
都可以作为Cond
的锁参数notifyList
是一个goroutine的链表,为了实现Cond
的同步机制,head
和tail
分别是链表的头和尾,wait
表示正在等待的goroutine,notify
表示已经通知到的goroutine
操作
Wait
方法会将当前goroutine陷入休眠状态,会先调用runtime_notifyListAdd
将等待计数器+1
,然后解锁并调用runtime_notifyListWait
等待其他goroutine的唤醒。
notifyListWait
的主要作用就是获取当前的goroutine,并将其追加到notifyList
链表末尾, 同时还会调用goparkunlock
陷入休眠状态,让出处理器的使用并等待调度器唤醒
Signal
和Broadcast
用来唤醒调用Wait
陷入休眠的goroutine, Signal
会唤醒队列最前面的goroutine, Broadcast
会唤醒所有的goroutine
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
notifyListNotifyAll
会从链表中取出所有的goroutine,并一次调用readyWithTime
,通过goready
将目标的goroutine唤醒.唤醒顺序与加入的顺序有关
notifyListNotifyOne
会只从链表中满足sudog.ticket == l.notify
的goroutine,通过readyWithTime
唤醒
通常,会调用不满足特定条件时调用Wait
陷入休眠,当满足条件时,使用Signal
或Broadcast
唤醒一个或全部的goroutine
小结
Cond
比for {}
忙等待更能够在长时间无法满足条件时,提升性能Wait
调用之前一定要使用L.Lock
持有该资源,否则会发生panicSignal
和Broadcast
都是唤醒goroutine,注意使用的场景不一样
ErrGroup
x/sync
包提供的ErrGroup
提供了同步,错误传播以及上下文取消的功能。比如下面并行获取网页数据
var g errgroup.Group
var urls = []string{
"http://www.google.com/",
"http://www.google.org/".
"http://www.somestupidname.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("successfully fetch all urls")
}
结构体
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
cancel
函数用于通知context
的goroutine由于某些子任务出错, 可以停止让出资源了WaitGroup
等待任务完成的WaitGroup
同步原语errOnce
接受任务返回的err
且保证err
只会被赋值一次
操作
WithContext
通过context
构造errgroup
, 返回的WithCancel
也会在Group
结构体内部使用
Go
方法用于创建新的并行子任务。这个方法会对WaitGroup
加1并创建一个新的goroutine,返回错误时调用cancel
并对err
赋值, 只有最早的错误才能被感知,后续错误会被丢弃
Wait
只是调用WaitGroup
的同步方法,在子任务完成时取消context
并返回可能出现的错误
Semaphore
信号量是并发编程常用的一种同步机制,保证持有的计数器在0到初始化权重之间。获取资源时,会将信号量的计数器减去对应值,释放资源时,重新加回来。当计数器大于信号量时会进入休眠状态,等待其他进程释放信号。可以控制访问资源的进程数量限制
go提供的带权重的信号量
结构体
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
size
信号量总的大小, cur
当前信号量的计数器, mu
互斥锁,waiters
等待获取资源的goroutine
操作
Acquire
获取制定权重资源的方法。分三种情况讨论
剩余资源大于获取的资源,且没有等待的goroutine时,直接获取信号量 需要获取的信号量大于 size
时, 不满足条件,直接返回其他情况,将当前goroutine加入到等待列表,并通过 select
等待当前goroutine被唤醒,唤醒之后就会获取信号量
TryAcquire
只会判断当前信号量是否有充足的资源获取,有的话返回true
,否则返回false
。TryAcquire
不会等待资源释放,所以更适用于延时敏感,用户立刻感知结果的场景
Release
对当前的信号量释放,以FIFO的顺序唤醒goroutine。会遍历waiters
中所有的等待者, 有充足的剩余资源就会通过channel
唤起制定的goroutine。也会存在无法唤起的情况,此时释放锁后就直接返回。如果信号量需要的资源非常多,可以通过Acquire
引入context
的超时时间
SingleFlight
能够在一个服务中抑制对下游的多次重复请求。一个比较常见的场景是,使用redis数据库,并设置了过期时间,如果非常多的请求打过来,但是缓存已经过期,此时会直接打穿到数据库,从而造成缓存击穿。singleflight
主要作用就是对于同一个key最终只会进行一次函数调用。举例
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, req Request) (Response, error) {
v, err , _ := requestGroup.Do(req.Hash(), func()(interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
结构体
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan-<Result
}
call
结构体中的val
和err
字段在执行传入的函数时,只会被赋值一次,只会在WaitGroup
等待结束被读取, dups
和chans
分别存储singleflight
抑制的请求数量以及在结果返回时将信息传递给调用方
操作
singleflight
提供了两个抑制请求的方法,Do
和DoChan
, 两个方法在功能上没有多大区别,只是实现上有差异
Do
方法的调用会获取互斥锁并尝试对Group
持有映射表进行懒加载,随后判断是否已经存在key
对应的函数调用
当不存在对应的 call
结构体时:初始化新的call
结构体指针;增加WaitGroup
持有的计数器;将call
结构体指针添加到映射表;释放持有的互斥锁Mutex
;阻塞调用doCall
等待结果返回当存在对应的 call
结构体时:增加dups
计数器;释放持有的互斥锁Mutex
;通过WaitGroup.Wait
等待请求的返回
val
和err
只会在doCall
中被赋值,所以当doCall
方法和WaitGroup.Wait
方法返回时,这两个值就会返回给Do
的调用者
DoChan
通过管道将信息同步给调用方,底层最终也是异步调用doCall
DoChan
和Do
的区别就是,DoChan
使用goroutine异步执行doCall
并向call
持有的chans
切片中追加chan Result
,实现异步传值。一旦调用的函数返回了错误,所有在等待的goroutine也会收到同样的错误
Forget
可以通知singleflight
在持有的映射表中删除某个键
总结
Mutex
互斥锁,有普通模式和饥饿模式RWMutex
读写互斥锁WaitGroup
等待一组goroutine结束Once
程序运行期间仅执行一次Cond
发生指定事件时唤醒ErrGroup
为一组goroutine提供同步,错误传播以及上下文取消的功能Semaphore
带权重的信号量SingleFlight
抑制对下游的重复请求
参考资料
Golang并发编程 Golang src





