暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Let's Go — 谈一谈 sync.map 的源码实现

架构狂人 2021-07-27
632

前言

  我们知道,map
是 Go 语言中的一种无序的键值对集合,可以通过 key
以 o(1) 速度检索数据值。但是 map
并不保证并发安全,不允许多个协程并发读写。一般可以通过加读写锁 RWMutex
实现对 map
的并发安全读写。另外,Go 语言标准库中提供了一种并发安全的集合结构 sync.map
,可供开发者直接安全地使用,无需额外加锁。

  因此,本篇将从源码角度谈一谈 Go 语言 sync.map
实现并发安全读写的原理。

基本数据结构

  sync.map
数据结构的定义与实现位于 Go 标准库 src/sync/map.go 文件当中。从中我们可以看到,sync.map
中首先定义了三个结构体 Map
readOnly
以及 entry

  Map
结构体中包含了 mu
read
dirty
misses
四个成员:

  • dirty
    是一个原生 map
    类型的成员,是非并发安全的,对 dirty
    的读写需要加锁。

  • read
    atomic.Value
    类型,可以实现并发安全地存取变量,并且实际上存储的是下面的 readOnly
    结构。readOnly
    结构内部包含了一个原生 map
    ,以及一个 amended
    标志,amended
    用于标识 read
    中的数据是否与 dirty
    一致。

  • mu
    用于为 dirty
    的读写加锁;

  • misses
    用于记录读取数据时,read
    未命中的次数。

  read
dirty
中所持有的 map
,其值均为指向 entry
结构体的指针。而在 entry
结构体中包含一个 unsafe.Pointer
类型成员,用于指向任意类型的数据。

type Map struct {

mu Mutex

read atomic.Value

dirty map[interface{}]*entry

misses int
}


type readOnly struct {

m map[interface{}]*entry

amended bool
}


var expunged = unsafe.Pointer(new(interface{}))

type entry struct {

p unsafe.Pointer // *interface{}
}

  对上述所要使用到的基本结构有了一个大致了解后,下面我们来看看 sync.map
中读写数据的实现方法。

读取操作

  Load
方法即为 sync.map
中读取数据的方法,如下所示:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)//获取read map
e, ok := read.m[key]//从read map中读取key对应的数据

//如果read中不存在数据,
//且read与dirty数据不一致
//那么就去查看dirty中是否存在数据
if !ok && read.amended {
m.mu.Lock()//加锁
//加锁,双重检查,以防加锁时被修改
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]

//双重检查后发现:确实read中不存在数据,
//且read与dirty数据不一致
if !ok && read.amended {
e, ok = m.dirty[key]//去dirty中读取

m.missLocked()//更新read的未命中次数misses
}
m.mu.Unlock()//数据读取完毕,解锁
}
if !ok {
//如果read和dirty中都不存在
//则返回不存在
return nil, false
}
//如果read或者dirty中存在
//则读取
return e.load()
}

func (m *Map) missLocked() {
m.misses++//read未命中计数misses加一
if m.misses < len(m.dirty) {
return
}
//read未命中,需要频繁访问dirty
//如果read未命中计数misses达到dirty长度
//因此则直接将dirty提升为read,并清空dirty
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
//如果数据指针p为nil或者被标记删除
//则返回不存在
if p == nil || p == expunged {
return nil, false
}
//否则,存在则返回数据
return *(*interface{})(p), true
}

  从上述 Load
方法可以看出,sync.map
每次读取数据时,会先从 read
中读取,并且使用双重检查锁作两次校验。如果发现 read
中确实不存在指定 key
的值数据而且 read
dirty
数据不一致,那么就有必要查看 dirty
中是否存在指定数据。

  特别注意,只要发现 read
未命中数据,则都会执行 missLocked
方法去更新 misses
,而不管 dirty
是否命中
。由于 read
未命中,会进一步去访问 dirty
。频繁地访问 dirty
需要频繁地加解锁,十分浪费性能开销。因此,如果 read
的未命中计数 misses
值达到 dirty
长度,则直接将 dirty
map
赋予 read
,即把dirty
提升为 read
,然后清空 dirty
。这样一来,即可能会提高 read
的命中率,减少性能开销。

  如果查看后最终发现 read
dirty
中都不存在指定数据,则返回不存在。否则将会返回指定数据。

  综上所述,从 Load
方法可以看出,read
支持无锁并发读操作,当 read
未命中,才会加锁访问 dirty

写入操作

  Store
方法即为 sync.map
中存入指定键值对的方法,如下所示:

func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
//存在且未标记删除且写入成功,则返回
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

//不存在或已标记删除
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {//如果read中存在,但已被标记为删除
if e.unexpungeLocked() {
//如果在read中被标记为删除
//那么该entry不可能被同步过到dirty(只会同步非nil非expunged)
//因此dirty中不存在该key,需要在此将该key存入dirty
m.dirty[key] = e//
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//如果dirty中存在,则写入dirty
e.storeLocked(&value)
} else {
//初始时read与dirty数据一致,都为空,
//因此amended初始为false
//amended为false说明dirty为空
//因为amended默认为false
//只有当read数据同步至dirty后,amended才会被置为true
//所以amended为false就说明read数据从未同步至dirty
//即dirty没有数据,为空
if !read.amended {
m.dirtyLocked()//将read的非nil非标记删除数据同步至dirty
//amended标记为数据不一致
//因为dirty将会写入新值
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}


func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {//被标记删除
return false
}
//如果未被标记删除
//则尝试更新
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

//若key被标记删除,则值置为nil
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

//为key存入新值
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

//分配dirty空间,以及同步read数据至dirty
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
//如果dirty未创建,
//则创建并将read中的非nil非expunged数据同步过去
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

  在使用 Store
方法存入键值对时,同样首先检查 read
中是否已经存在该 key
。如果已经存在该 key
,则会使用 value
更新该 key
对应的值。否则如果该 key
read
中不存在或者已被标记删除,则会进行加锁双重检查 read

  • 如果该 key
    read
    中存在但已被标记删除,说明该 key
    不可能存在于 dirty
    ,因为 read
    只会同步非 nil
    expunged
    key
    dirty
    中,因此 dirty
    中不存在该 key
    ,需要在此将该 key
    存入 dirty
  • 如果该 key
    read
    中不存在,但在 dirty
    中存在,则直接更新 dirty
    中的值即可。
  • 如果该 key
    read
    dirty
    中均不存在,则判断 amended
    是否为 false
    。因为 amended
    默认为 false
    , 只有当 dirty
    首次写入数据时,才会将 amended
    置为 true
    。因此如果判断 amended
    false
    ,则说明此次为 dirty
    首次写入新键值对,需要调用 dirtyLocked
    方法确保 dirty
    已创建好,若是首次创建,则需把 read
    中的非 nil
    expunged
    key
    同步至 dirty
    中。

  综上所述,从 Store
方法可以看出,read
支持对已存在键值对的无锁并发写操作,当 read
未命中,才会加锁操作 dirty
。并且 read
dirty
均为命中的话,就可能需要全量拷贝 read
数据至 dirty
,此时可能会影响性能。

删除操作

  接下来,我们看一下 sync.map
中删除键值对的操作,sync.map
提供了 LoadAndDelete
方法,可用于删除指定 key
的值,并返回该值,如下所示:

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]

//如果read中不存在数据,
//且read与dirty数据不一致
//那么就去查看dirty中是否存在数据
if !ok && read.amended {
//双重检查锁
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//双重检查后发现:确实read中不存在数据,
//且read与dirty数据不一致
if !ok && read.amended {
e, ok = m.dirty[key]//读出dirty中的数据
delete(m.dirty, key)
//该key的每次访问都会通过慢路径,直到dirty被提升为read
//尽管dirty中也可能没有该键值对
//但也要记录一次miss,以促成dirty提升为read
//当dirty提升为read后,原dirty被清除,amended为false///此时将会走快路径访问该key
m.missLocked()
}
m.mu.Unlock()
}
if ok {
//如果read或dirty中存在该键值对
//则删除
return e.delete()
}
return nil, false
}

func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
//如果该值已被标记删除
//则返回nil
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
//如果该值正常存在
//则置为nil
return *(*interface{})(p), true
}
}
}

  LoadAndDelete
方法和上述的Load
方法类似,会检查 read
中是否存在该 key
,如果存在则直接调用 e.delete()
方法作删除操作。如果 read
中不存在且 read
dirty
数据不一致,那么就去查看 dirty
中是否存在该 key
:

  • 首先依然还是加双重检查锁,确认 read
    中是否存在该 key
    ;
  • 然后,如果 read
    中确认不存在,且 read
    dirty
    数据不一致,再去访问 dirty

  该 key
的每次访问都会走慢路径,直到 dirty
被提升为 read
为止。因为当 dirty
提升为 read
后,原 dirty
将会被赋予 nil
清除,此时新的  read
中的 amended
false
。再次访问该 key
时将不再进入慢路径,提升了访问效率。

  因此,尽管 dirty
中也可能没有该键值对,但也要记录一次 miss
,以促成 dirty
提升为 read
,避免再进入慢路径,徒增加解锁的性能开销。

  在 LoadAndDelete
的删除操作中,如果 read
命中,则实质上是将 read
中的该 key
entry.p
值设置为 nil
,而并没有对 read
调用原生 map
delete
方法。这也可以提升快路径的执行效率。

  read
中的这些 entry.p
nil
key
,将会在  read
同步至 dirty
时,被进一步标记为 expunged
。并且在 read
同步至 dirty
时,不会被拷贝到 dirty
中,dirty
中存在的都是非 nil
expunged
的实质存在的数据。最终当 miss
计数达到阈值时,会将 dirty
提升为新的 read
,原 read
中的数据,包括那些已删除废弃的数据,就正式等待 GC 清理了。

总结

  sync.Map
实现了两级缓存结构:read
dirty

  • read
    支持无锁并发读,以及对已存在 key
    的无锁并发写。
  • dirty
    的读写都必须加锁。

  基于该两级缓存结构,sync.Map
实现了“快路径”和“慢路径”两种访问方式:

  • 当访问命中 read
    时,将会走“快路径”方式,访问效率非常高;“快路径”下的删除操作,仅作 nil
    值标记,不执行实际删除,效率非常高;
  • 当访问未命中 read
    时,将会走“慢路径”方式,需要加锁访问 dirty
    ,访问效率较低;写入新 key
    时,必须经过“慢路径”,效率较低;
  • sync.Map
    中维护了一个 miss
    计数,记录 read
    读操作未命中的次数,用于促成 dirty
    提升为 read
    ,使得路径的访问尽可能向“快路径”靠拢。


文章转载自架构狂人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论