https://github.com/go-redsync/redsync是golang实现的一个redis分布式锁,支持quorum机制,内部通过委托模式支持https://github.com/redis/go-redis客户端和https://github.com/gomodule/redigo 客户端。首先看下如何使用,然后分析下它的源码具体实现。
package mainimport (goredislib "github.com/go-redis/redis/v8""github.com/go-redsync/redsync/v4""github.com/go-redsync/redsync/v4/redis/goredis/v8")func main() {// 创建一个redis的客户端连接client := goredislib.NewClient(&goredislib.Options{Addr: "localhost:6379",})/*创建一个redis集群模式的客户端连接client := goredislib.NewClusterClient(&goredislib.ClusterOptions{Addr: []string{"localhost:6379"},})*/// 创建redsync的客户端连接池pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)// 创建redsync实例rs := redsync.New(pool)// 通过相同的key值名获取同一个互斥锁.mutexname := "my-global-mutex"//创建基于key的互斥锁mutex := rs.NewMutex(mutexname)// 对key进行加锁if err := mutex.Lock(); err != nil {panic(err)}// 锁续期if ok, err := mutex.Extend(); err != nil || !ok {panic(err)}// 释放互斥锁if ok, err := mutex.Unlock(); !ok || err != nil {panic("unlock failed")}}
可以看到,它的核心分为下面几步:
1,获取redis连接池
2,创建redsync的客户端连接池
3, 创建redsync实例
4,创建基于key的互斥锁
5,对key进行加锁
6,锁续期
7,释放互斥锁
下面我们依次按照上述几步分析下它的源码:
首先就是普通的redis-client创建连接池
// 创建redsync的客户端连接池pool := goredis.NewPool(client)
然后就是基于委托模式redis/goredis/v8/goredis.go对连接池进行封装
// NewPool returns a Goredis-based pool implementation.func NewPool(delegate redis.UniversalClient) redsyncredis.Pool {return &pool{delegate}}
type pool struct {delegate redis.UniversalClient}
创建redsync实例源码位于redsync.go,传入多个redis连接池实例就实现了红锁。
func New(pools ...redis.Pool) *Redsync {return &Redsync{pools: pools,}}
// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.type Redsync struct {pools []redis.Pool}
创建基于key的互斥锁,就是初始化锁需要的参数,源码位于redsync.go
// NewMutex returns a new distributed mutex with given name.func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {m := &Mutex{name: name,expiry: 8 * time.Second,tries: 32,delayFunc: func(tries int) time.Duration {return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond},genValueFunc: genValue,driftFactor: 0.01,timeoutFactor: 0.05,quorum: len(r.pools)/2 + 1,pools: r.pools,}for _, o := range options {o.Apply(m)}return m}
// A Mutex is a distributed mutual exclusion lock.type Mutex struct {name stringexpiry time.Durationtries intdelayFunc DelayFuncdriftFactor float64timeoutFactor float64quorum intgenValueFunc func() (string, error)value stringuntil time.Timepools []redis.Pool}
加锁的源码位于mutex.go,所有的操作函数都封装了两个版本,带context和不带context的
func (m *Mutex) Lock() error {return m.LockContext(nil)}
加锁过程是先获取锁,如果超过一半节点上获取成功,则认为加锁成功,否则释放已经加锁的节点:
func (m *Mutex) LockContext(ctx context.Context) error {value, err := m.genValueFunc()case <-time.After(m.delayFunc(i)):n, err := func() (int, error) {ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.acquire(ctx, pool, value)})}()now := time.Now()until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))if n >= m.quorum && now.Before(until) {m.value = valuem.until = untilreturn nil}func() (int, error) {ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.release(ctx, pool, value)})}()
如果加锁失败,会进行随机重试。从各个节点获取执行结果的逻辑抽象出了一个函数: 起多个goroutine发起请求,然后通过一个阻塞的chan来收集结果,返回上游供决策:
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {ch := make(chan result)for node, pool := range m.pools {go func(node int, pool redis.Pool) {r := result{Node: node}r.Status, r.Err = actFn(pool)ch <- r}(node, pool)}for range m.pools {r := <-chif r.Status {n++} else if r.Err != nil {err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})} else {taken = append(taken, r.Node)err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})}}if len(taken) >= m.quorum {return n, &ErrTaken{Nodes: taken}}return n, err
具体执行单节点加锁的逻辑位于
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {conn, err := pool.Get(ctx)reply, err := conn.SetNX(m.name, value, m.expiry)
redis/goredis/goredis.go
func (c *conn) SetNX(name string, value string, expiry time.Duration) (bool, error) {ok, err := c.delegate.SetNX(name, value, expiry).Result()return ok, noErrNil(err)}
释放锁的操作就是执行lua脚本,先判断锁是不是自己加的,如果是就释放
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {status, err := conn.Eval(deleteScript, m.name, value)
var deleteScript = redis.NewScript(1, `if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("DEL", KEYS[1])elsereturn 0end`)
释放互斥锁的逻辑和加锁类似,底层函数是一样的
func (m *Mutex) Unlock() (bool, error) {return m.UnlockContext(nil)}
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {// UnlockContext unlocks m and returns the status of unlock.
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.release(ctx, pool, m.value)})if n < m.quorum {return false, err}return true, nil}
最后看下续期操作,如果本地事务耗时特别长,锁过期时间内完不成操作就需要锁续期mutex.go
// Extend resets the mutex's expiry and returns the status of expiry extension.func (m *Mutex) Extend() (bool, error) {return m.ExtendContext(nil)}
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))})now := time.Now()until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))if now.Before(until) {m.until = untilreturn true, nil}
也是执行lua脚本,先看看是不是自己加的锁,如果是则修改过期时间
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {status, err := conn.Eval(touchScript, m.name, value, expiry)
var touchScript = redis.NewScript(1, `if redis.call("GET", KEYS[1]) == ARGV[1] thenreturn redis.call("PEXPIRE", KEYS[1], ARGV[2])elsereturn 0end`)
除此之外,还有一个函数,判断当前持有锁是否有效,能获取到值,和我们的value相等,说明有效。
func (m *Mutex) Valid() (bool, error) {return m.ValidContext(nil)}
func (m *Mutex) ValidContext(ctx context.Context) (bool, error) {n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.valid(ctx, pool)})return n >= m.quorum, err}
func (m *Mutex) valid(ctx context.Context, pool redis.Pool) (bool, error) {reply, err := conn.Get(m.name)if err != nil {return false, err}return m.value == reply, nil
其实,到底需不需要红锁,我们需要判断我们的业务场景和我们资源配置,资源允许、可用性要求很高,那么可以使用红锁。那么红锁真的万无一失吗?其实不然,首先我们的value是一个随机值,既然是随机的,就有可能相同,相同了,必然锁失效。红锁是通过过半机制提升锁的可用性,防止单节点挂掉。如果过半节点都挂了,锁还可用吗?






