暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Go同步原语

专业造轮子 2021-10-09
340

原语简介

Go同步原语主要分为基本原语和扩展原语,一般只需要了解基本原语即可,基本原语由sync
包提供,包括Mutex
, RWMutex
, WaitGroup
, Once
Cond
; 扩展原语有x/sync
包提供,主要有ErrGroup
, Semaphore
SingleFlight

Mutex

结构体

Mutex
是普通的互斥锁,由两个字段state
sema
组成,state
表示互斥锁状态,sema
控制锁状态的信号量

type Mutex struct {
    state int32
    sema  int32
}

状态state

互斥锁状态是int32
表示的,但是锁的状态不是互斥的,最低三位分别表示mutexLocked
, mutexWoken
mutexStaving
,剩下的位置都用来表示有多少个goroutine等待互斥锁被释放

默认初始化状态(state
)都是0
, 互斥锁被锁定时,mutextLocked
会被置成1
, 互斥锁在正常模式下被唤醒时, mutexWoken
会被置成1
, mutexStaving
表示互斥锁进入了饥饿模式, 稍后会介绍

饥饿模式

go语言的互斥锁在1.9版本之后,分成了普通模式和饥饿模式,主要是为了保证互斥锁获取的公平性, 能够避免goroutine陷入等待无法获取锁而造成较高的延时。

正常模式下,互斥锁的获取是按照FIFO
的顺序进行的,但是如果一个goroutine进程也调用lock
时,那么它大概率是获取不到锁的,为了减少这种情况的出现,就设计了饥饿模式。

饥饿模式下, 互斥锁会直接交给等待队列最前面的goroutine,新的goroutine不能获取到锁,也无法进入自旋, 只能在队列末尾等待

正常模式切饥饿模式:goroutine超过1ms没有获取到锁,会从正常模式切换到饥饿模式

饥饿模式切正常模式:goroutine获得了互斥锁且是队列最末尾的协程或者它的等待时间少于1ms,会从饥饿模式切换会正常模式

加锁

互斥锁Mutex
加锁是由Lock
方法完成的,如果锁的状态是0
,直接将mutexLocked
置位1
, 否则, 进入lockSlow
方法,尝试通过自旋或其他方法等待锁的释放并获取互斥锁。源码

自旋其实在多线程同步的过程中使用的一种机制,当前的进程在进入自旋的过程会一直保持CPU的占用,持续检查某个条件是否为真,在多核CPU上, 自旋的优点避免了goroutine的切换,可以提高性能

进入自旋条件:

  1. 运行在多CPU机器上
  2. 当前goroutine获取锁进入自旋的次数小于4次
  3. 当前机器上至少存在一个正在运行的处理器P并且处理的队列是空的
  4. 普通模式下才能进入自旋

当判定能够进入自旋时,会调用runtime_doSpin
,最终的调用是通过汇编语言调用指定次数的PAUSE
指令

处理了自旋相关的逻辑后,互斥锁根据上下文情况计算当前互斥锁的状态,并更新state
的信息,修改成功后,如果是锁定状态或饥饿模式,直接跳过,否则调用runtime_SemacquireMutex
方法, runtime_SemacquireMutex
主要作用就是通过Mutex
的信号量保证资源不会被两个goroutine获取, 会不断调用goparkunlock
将当前goroutine陷入休眠等待信号量可以被获取

一旦获取到信号量,说明互斥锁被解锁,该方法会立刻返回,执行Lock
后面的代码,最后会判断是否退出饥饿模式,如果可以退出饥饿模式,则会切换到正常模式

解锁

互斥锁的解锁就非常简单,Unlock
会直接调用atomic
包提供的AddInt32
,如果返回的新状态不等于0
就会进入unlockSlow
方法,源码

饥饿模式下,会直接调用runtime_Semrelease
方法,将锁交给下一个等待者,等待者会在被唤醒后设置mutexLocked
状态,此时仍然是mutexStarving
状态,所以新的goroutine也无法获取锁

正常模式下, 如果互斥锁不存在等待者或者最低三位都是0, 那么就不需要唤醒其他goroutine,直接返回,当有goroutine处于等待状态时,还是会通过runtime_Semrelease
唤醒对应的goroutime并移交所有权

RWMutex

读写互斥锁只是在写的时候互斥,读的操作是可以并行的

结构体

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

包含一个互斥锁,还有两个信号量(分别用于写等待读和读等待写), readerCount
存储了当前正在执行读操作的数量,readerWait
表示当写操作被阻塞时等待的读操作个数

读锁

加锁读锁时,通过atomic.AddInt32
方法为readerCount
加一,如果返回了负数,则说明有goroutine获得了写锁,当前goroutine会调用runtime_SemacquireMutex
陷入休眠等待唤醒

释放读锁时,会调用RUnlock
方法为readerCount
减一,如果遇到了返回值小于0的情况,则说明有个正在写的操作,这时就调用rUnlockSlow
方法减少当前写操作等待读操作数readerWait
, 并在所有读操作都被释放之后触发写操作的信号量writerSem

读写锁

资源使用者想获取写锁时,需要通过Lock
方法,该方法先调用了互斥锁持有的Mutex
Lock
方法, 保证其他获取读写锁的goroutine进入等待状态,然后通过调用atomic.AddInt32
阻塞后续读的操作

如果有其它的goroutine持有互斥锁的读锁,该goroutine会调用runtime_SemacquireMutex
进入休眠状态,等待读锁释放时触发writerSem
信号量将当前协程唤醒

对资源读写操作完成之后会通过atomic.AddInt32
变回正数,并通过for
循环触发所有由于获取读锁而陷入等待的goroutine,最后,RWMutex
会释放持有的互斥锁让其他的协程能够重新获取读写锁

WaitGroup

WaitGroup
sync
包比较常见的同步机制,用于等待一系列的goroutine的返回,常见的场景是批量执行RPC调用

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, req := range requests {
    go func(r *Request) {
        defer wg.Done()
    }(req)
}
wg.Wait()

通过WaitGroup
可以轻松同步信息,加快了程序处理的速度,只有等Wait
方法返回后,程序才会调用其他的逻辑

结构体

type WaitGroup struct {
    noCopy noCopy
    state1 [3]int32
}

copylock
是一个编译期间检查拷贝的,noCopy
是不允许拷贝的。此外,还包括一个共占用12字节大小的数组,用来存储当前结构体持有的状态和信号量,在64和32位机器上表现不同。WaitGroup
提供了state
私有方法,来提取state1
字段的状态和信号量

操作

WaitGroup
包含三个操作Add
, Done
Wait
, 其中Done
只是调用Add(-1)

Add
主要作用就是更新WaitGroup
中持有的计数器counter
, 计数器只能是非负数。当调用Add
方法导致计数器归零且还有等待的goroutine时,会通过runtime_Semrelease
唤醒处于等待状态的所有goroutine

Wait
会在当前计数器中保存的数据大于0时,修改等待goroutine的个数waiter
并调用runtime_Semacquire
陷入睡眠状态。陷入睡眠的goroutine会等待Add
方法在计数器为0时唤醒

小结

  • Add
    Wait
    不能在goroutine中并发调用,否则会造成崩溃
  • WaitGroup
    必须在Wait
    方法返回后才能重新被使用
  • 可以同时有多个goroutine等待当前WaitGroup
    计数器归零, 这些goroutine也会被唤醒

Once

sync
提供的Once
主要功能是程序运行期间,某段代码只会执行一次。如下面的代码,只会输出一次

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("once")
        })
    }
}

结构体

type Once struct {
    done uint32
    m Mutex
}

结构体只包含一个标识代码段是否执行过的done
和互斥锁Mutex

操作

只有一个操作Do
, 接受一个入参为空的函数,使用atomic.LoadUint32
检查是否执行过,否则会进入doSlow
运行函数。

doSlow
实现也很简单,先为当前goroutine获取互斥锁,然后通过defer
关键字将done
成员变量设置成1
, 并运行函数, 无论函数运行过程中是否panic
,都会把done
设置成1

所以,不管Do
传入了什么参数,函数都只会被执行一次,且不管执行结果如何

Cond

Cond
是一个条件变量,通过Cond
可以让一系列的goroutine在触发某事件或条件时才会被唤醒,比如下面的代码,10个goroutine会通过Wait
等待期望的信号或事件,剩下的一个goroutine会调用Broadcast
通知陷入等待的goroutine, 调用之后,会打印10次"listen"

func main() {
    c := &sync.NewCond(&sync.Mutex{})
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <- ch
}
func broadcast(c *sync.Cond) {
    c.L.Lock()
    c.Broadcast()
    c.L.Unlock()
}
func listen(c *sync.Cond) {
    c.L.Lock()
    c.Wait()
    fmt.Println("listen")
    c.L.Unlock()
}

结构体

type Cond struct {
    noCopy noCopy
    L Locker
    notify notifyList
    checker copyChecker
}
type notifyList struct {
    wait uint32
    notify uint32
    lock mutex
    head *sudog
    tail *sudog
}

  • noCopy
    保证编译期间Cond
    不会被拷贝
  • checker
    保证运行期间发生拷贝直接panic
  • 持有的锁L
    Locker
    的一个接口,任意实现Lock
    Unlock
    都可以作为Cond
    的锁参数
  • notifyList
    是一个goroutine的链表,为了实现Cond
    的同步机制,head
    tail
    分别是链表的头和尾, wait
    表示正在等待的goroutine,notify
    表示已经通知到的goroutine

操作

Wait
方法会将当前goroutine陷入休眠状态,会先调用runtime_notifyListAdd
将等待计数器+1
,然后解锁并调用runtime_notifyListWait
等待其他goroutine的唤醒。

notifyListWait
的主要作用就是获取当前的goroutine,并将其追加到notifyList
链表末尾, 同时还会调用goparkunlock
陷入休眠状态,让出处理器的使用并等待调度器唤醒

Signal
Broadcast
用来唤醒调用Wait
陷入休眠的goroutine, Signal
会唤醒队列最前面的goroutine, Broadcast
会唤醒所有的goroutine

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAll
会从链表中取出所有的goroutine,并一次调用readyWithTime
,通过goready
将目标的goroutine唤醒.唤醒顺序与加入的顺序有关

notifyListNotifyOne
会只从链表中满足sudog.ticket == l.notify
的goroutine,通过readyWithTime
唤醒

通常,会调用不满足特定条件时调用Wait
陷入休眠,当满足条件时,使用Signal
Broadcast
唤醒一个或全部的goroutine

小结

  • Cond
    for {}
    忙等待更能够在长时间无法满足条件时,提升性能
  • Wait
    调用之前一定要使用L.Lock
    持有该资源,否则会发生panic
  • Signal
    Broadcast
    都是唤醒goroutine,注意使用的场景不一样

ErrGroup

x/sync
包提供的ErrGroup
提供了同步,错误传播以及上下文取消的功能。比如下面并行获取网页数据

var g errgroup.Group
var urls = []string{
    "http://www.google.com/",
    "http://www.google.org/".
    "http://www.somestupidname.com/",
}
for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    }) 
}
if err := g.Wait(); err == nil {
    fmt.Println("successfully fetch all urls")
}

结构体

type Group struct {
    cancel func()
    wg sync.WaitGroup
    errOnce sync.Once
    err error
}

  • cancel
    函数用于通知context
    的goroutine由于某些子任务出错, 可以停止让出资源了
  • WaitGroup
    等待任务完成的WaitGroup
    同步原语
  • errOnce
    接受任务返回的err
    且保证err
    只会被赋值一次

操作

WithContext
通过context
构造errgroup
, 返回的WithCancel
也会在Group
结构体内部使用

Go
方法用于创建新的并行子任务。这个方法会对WaitGroup
加1并创建一个新的goroutine,返回错误时调用cancel
并对err
赋值, 只有最早的错误才能被感知,后续错误会被丢弃

Wait
只是调用WaitGroup
的同步方法,在子任务完成时取消context
并返回可能出现的错误

Semaphore

信号量是并发编程常用的一种同步机制,保证持有的计数器在0到初始化权重之间。获取资源时,会将信号量的计数器减去对应值,释放资源时,重新加回来。当计数器大于信号量时会进入休眠状态,等待其他进程释放信号。可以控制访问资源的进程数量限制

go提供的带权重的信号量

结构体

type Weighted struct {
    size int64
    cur int64
    mu sync.Mutex
    waiters list.List
}

size
信号量总的大小, cur
当前信号量的计数器, mu
互斥锁,waiters
等待获取资源的goroutine

操作

Acquire
获取制定权重资源的方法。分三种情况讨论

  1. 剩余资源大于获取的资源,且没有等待的goroutine时,直接获取信号量
  2. 需要获取的信号量大于size
    时, 不满足条件,直接返回
  3. 其他情况,将当前goroutine加入到等待列表,并通过select
    等待当前goroutine被唤醒,唤醒之后就会获取信号量

TryAcquire
只会判断当前信号量是否有充足的资源获取,有的话返回true
,否则返回false
TryAcquire
不会等待资源释放,所以更适用于延时敏感,用户立刻感知结果的场景

Release
对当前的信号量释放,以FIFO的顺序唤醒goroutine。会遍历waiters
中所有的等待者, 有充足的剩余资源就会通过channel
唤起制定的goroutine。也会存在无法唤起的情况,此时释放锁后就直接返回。如果信号量需要的资源非常多,可以通过Acquire
引入context
的超时时间

SingleFlight

能够在一个服务中抑制对下游的多次重复请求。一个比较常见的场景是,使用redis数据库,并设置了过期时间,如果非常多的请求打过来,但是缓存已经过期,此时会直接打穿到数据库,从而造成缓存击穿。singleflight
主要作用就是对于同一个key最终只会进行一次函数调用。举例

type service struct {
    requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, req Request) (Response, error) {
    v, err , _ := requestGroup.Do(req.Hash(), func()(interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    return Response{
        rows: rows,
    }, nil
}

结构体

type Group struct {
    mu sync.Mutex
    m map[string]*call
}
type call struct {
    wg sync.WaitGroup
    val interface{}
    err error
    dups int
    chans []chan-<Result
}

call
结构体中的val
err
字段在执行传入的函数时,只会被赋值一次,只会在WaitGroup
等待结束被读取, dups
chans
分别存储singleflight
抑制的请求数量以及在结果返回时将信息传递给调用方

操作

singleflight
提供了两个抑制请求的方法,Do
DoChan
, 两个方法在功能上没有多大区别,只是实现上有差异

Do
方法的调用会获取互斥锁并尝试对Group
持有映射表进行懒加载,随后判断是否已经存在key
对应的函数调用

  1. 当不存在对应的call
    结构体时:初始化新的call
    结构体指针;增加WaitGroup
    持有的计数器;将call
    结构体指针添加到映射表;释放持有的互斥锁Mutex
    ;阻塞调用doCall
    等待结果返回
  2. 当存在对应的call
    结构体时:增加dups
    计数器;释放持有的互斥锁Mutex
    ;通过WaitGroup.Wait
    等待请求的返回

val
err
只会在doCall
中被赋值,所以当doCall
方法和WaitGroup.Wait
方法返回时,这两个值就会返回给Do
的调用者

DoChan
通过管道将信息同步给调用方,底层最终也是异步调用doCall

DoChan
Do
的区别就是,DoChan
使用goroutine异步执行doCall
并向call
持有的chans
切片中追加chan Result
,实现异步传值。一旦调用的函数返回了错误,所有在等待的goroutine也会收到同样的错误

Forget
可以通知singleflight
在持有的映射表中删除某个键

总结

  • Mutex
    互斥锁,有普通模式和饥饿模式
  • RWMutex
    读写互斥锁
  • WaitGroup
    等待一组goroutine结束
  • Once
    程序运行期间仅执行一次
  • Cond
    发生指定事件时唤醒
  • ErrGroup
    为一组goroutine提供同步,错误传播以及上下文取消的功能
  • Semaphore
    带权重的信号量
  • SingleFlight
    抑制对下游的重复请求

参考资料

  • Golang并发编程
  • Golang src


文章转载自专业造轮子,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论