暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:redsync

        https://github.com/go-redsync/redsync是golang实现的一个redis分布式锁,支持quorum机制,内部通过委托模式支持https://github.com/redis/go-redis客户端和https://github.com/gomodule/redigo 客户端。首先看下如何使用,然后分析下它的源码具体实现。

    package main


    import (
    goredislib "github.com/go-redis/redis/v8"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
    )


    func main() {
    // 创建一个redis的客户端连接
    client := goredislib.NewClient(&goredislib.Options{
    Addr: "localhost:6379",
    })
    /*
    创建一个redis集群模式的客户端连接
    client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
    Addr: []string{"localhost:6379"},
    })
    */


    // 创建redsync的客户端连接池
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)


    // 创建redsync实例
    rs := redsync.New(pool)


    // 通过相同的key值名获取同一个互斥锁.
    mutexname := "my-global-mutex"
    //创建基于key的互斥锁
    mutex := rs.NewMutex(mutexname)


    // 对key进行加锁
    if err := mutex.Lock(); err != nil {
    panic(err)
    }

      // 锁续期
    if ok, err := mutex.Extend(); err != nil || !ok {
    panic(err)
    }
    // 释放互斥锁
    if ok, err := mutex.Unlock(); !ok || err != nil {
    panic("unlock failed")
    }
    }


            可以看到,它的核心分为下面几步:

    1,获取redis连接池

    2,创建redsync的客户端连接池

    3, 创建redsync实例

    4,创建基于key的互斥锁

    5,对key进行加锁

    6,锁续期

    7,释放互斥锁

            下面我们依次按照上述几步分析下它的源码:

    首先就是普通的redis-client创建连接池

      // 创建redsync的客户端连接池
        pool := goredis.NewPool(client)

      然后就是基于委托模式redis/goredis/v8/goredis.go对连接池进行封装

        // NewPool returns a Goredis-based pool implementation.
        func NewPool(delegate redis.UniversalClient) redsyncredis.Pool {
        return &pool{delegate}
        }
          type pool struct {
          delegate redis.UniversalClient
          }

          创建redsync实例源码位于redsync.go,传入多个redis连接池实例就实现了红锁。

            func New(pools ...redis.Pool) *Redsync {
            return &Redsync{
            pools: pools,
            }
            }
              // Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
              type Redsync struct {
              pools []redis.Pool
              }

              创建基于key的互斥锁,就是初始化锁需要的参数,源码位于redsync.go

                // NewMutex returns a new distributed mutex with given name.
                func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
                m := &Mutex{
                name: name,
                expiry: 8 * time.Second,
                tries: 32,
                delayFunc: func(tries int) time.Duration {
                return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
                },
                genValueFunc: genValue,
                driftFactor: 0.01,
                timeoutFactor: 0.05,
                quorum: len(r.pools)/2 + 1,
                pools: r.pools,
                }
                for _, o := range options {
                o.Apply(m)
                }
                return m
                }
                  // A Mutex is a distributed mutual exclusion lock.
                  type Mutex struct {
                  name string
                  expiry time.Duration




                  tries int
                  delayFunc DelayFunc




                  driftFactor float64
                  timeoutFactor float64




                  quorum int




                  genValueFunc func() (string, error)
                  value string
                  until time.Time




                  pools []redis.Pool
                  }

                  加锁的源码位于mutex.go,所有的操作函数都封装了两个版本,带context和不带context的

                    func (m *Mutex) Lock() error {
                    return m.LockContext(nil)
                    }

                    加锁过程是先获取锁,如果超过一半节点上获取成功,则认为加锁成功,否则释放已经加锁的节点:

                      func (m *Mutex) LockContext(ctx context.Context) error {
                      value, err := m.genValueFunc()
                      case <-time.After(m.delayFunc(i)):
                      n, err := func() (int, error) {
                      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                      defer cancel()
                      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                      return m.acquire(ctx, pool, value)
                      })
                      }()
                      now := time.Now()
                      until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
                      if n >= m.quorum && now.Before(until) {
                      m.value = value
                      m.until = until
                      return nil
                      }
                      func() (int, error) {
                      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                      defer cancel()
                      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                      return m.release(ctx, pool, value)
                      })
                      }()

                      如果加锁失败,会进行随机重试。从各个节点获取执行结果的逻辑抽象出了一个函数: 起多个goroutine发起请求,然后通过一个阻塞的chan来收集结果,返回上游供决策:

                        func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
                          ch := make(chan result)
                        for node, pool := range m.pools {
                        go func(node int, pool redis.Pool) {
                        r := result{Node: node}
                        r.Status, r.Err = actFn(pool)
                        ch <- r
                        }(node, pool)
                        }
                          for range m.pools {
                        r := <-ch
                        if r.Status {
                        n++
                        } else if r.Err != nil {
                        err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})
                        } else {
                        taken = append(taken, r.Node)
                        err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})
                        }
                        }




                        if len(taken) >= m.quorum {
                        return n, &ErrTaken{Nodes: taken}
                        }
                        return n, err

                        具体执行单节点加锁的逻辑位于

                          func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
                          conn, err := pool.Get(ctx)
                          reply, err := conn.SetNX(m.name, value, m.expiry)

                          redis/goredis/goredis.go

                            func (c *conn) SetNX(name string, value string, expiry time.Duration) (bool, error) {
                            ok, err := c.delegate.SetNX(name, value, expiry).Result()
                            return ok, noErrNil(err)
                            }

                            释放锁的操作就是执行lua脚本,先判断锁是不是自己加的,如果是就释放

                              func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
                              status, err := conn.Eval(deleteScript, m.name, value)
                                var deleteScript = redis.NewScript(1, `
                                if redis.call("GET", KEYS[1]) == ARGV[1] then
                                return redis.call("DEL", KEYS[1])
                                else
                                return 0
                                end
                                `)

                                释放互斥锁的逻辑和加锁类似,底层函数是一样的

                                  func (m *Mutex) Unlock() (bool, error) {
                                  return m.UnlockContext(nil)
                                  }
                                    func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
                                    // UnlockContext unlocks m and returns the status of unlock.
                                      func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
                                      n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                                      return m.release(ctx, pool, m.value)
                                      })
                                      if n < m.quorum {
                                      return false, err
                                      }
                                      return true, nil
                                      }

                                      最后看下续期操作,如果本地事务耗时特别长,锁过期时间内完不成操作就需要锁续期mutex.go

                                        // Extend resets the mutex's expiry and returns the status of expiry extension.
                                        func (m *Mutex) Extend() (bool, error) {
                                        return m.ExtendContext(nil)
                                        }
                                          func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
                                          n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                                          return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
                                          })
                                          now := time.Now()
                                          until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
                                          if now.Before(until) {
                                          m.until = until
                                          return true, nil
                                          }

                                          也是执行lua脚本,先看看是不是自己加的锁,如果是则修改过期时间

                                            func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
                                            status, err := conn.Eval(touchScript, m.name, value, expiry)
                                              var touchScript = redis.NewScript(1, `
                                              if redis.call("GET", KEYS[1]) == ARGV[1] then
                                              return redis.call("PEXPIRE", KEYS[1], ARGV[2])
                                              else
                                              return 0
                                              end
                                              `)

                                              除此之外,还有一个函数,判断当前持有锁是否有效,能获取到值,和我们的value相等,说明有效。

                                                func (m *Mutex) Valid() (bool, error) {
                                                return m.ValidContext(nil)
                                                }
                                                  func (m *Mutex) ValidContext(ctx context.Context) (bool, error) {
                                                  n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                                                  return m.valid(ctx, pool)
                                                  })
                                                  return n >= m.quorum, err
                                                  }
                                                    func (m *Mutex) valid(ctx context.Context, pool redis.Pool) (bool, error) {
                                                    reply, err := conn.Get(m.name)
                                                    if err != nil {
                                                    return false, err
                                                    }
                                                    return m.value == reply, nil

                                                            其实,到底需不需要红锁,我们需要判断我们的业务场景和我们资源配置,资源允许、可用性要求很高,那么可以使用红锁。那么红锁真的万无一失吗?其实不然,首先我们的value是一个随机值,既然是随机的,就有可能相同,相同了,必然锁失效。红锁是通过过半机制提升锁的可用性,防止单节点挂掉。如果过半节点都挂了,锁还可用吗?

                                                    文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论