暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis 实现分布式锁

PiPiD 2021-12-07
179

在单机多线程环境中,通过 JAVA 提供的  volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。

在分布式的场景中,多个进程需要以互斥的方式来访问独占的资源。这个时候就需要分布式锁。

1. 分布式锁定义

分布式锁需要同时满足以下条件:

  • 互斥性:在任意时刻,只能有一个进程持有锁。

  • 防死锁:即使有一个进程在持有锁的期间崩溃而未能主动释放锁,要有其他方式去释放锁从而保证其他进程能获取到锁。

  • 加锁和解锁的必须是同一个进程。

  • 锁的续期问题:当业务的处理时间超过了锁的超时时间,锁需要自动续期。

2. 常见的分布式锁实现方案

  • 基于 ZooKeeper 实现分布式锁

  • 基于 Redis 实现分布式锁

3. 基于 Redis 实现分布式锁

  • 指定一个 key 作为锁标记,value 值设置为唯一的用户标识

  • 当 key 不存在时才设置值,确保同一时间只有一个客户端获取到锁。确保互斥性

  • 设置一个过期时间,防止因为系统因此没有删除这个 key,导致死锁

  • 当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足只有加锁的人才能释放锁

3.1 加锁

3.1.1 错误示范
public static void wrongGetLock1(String lockKey, String requestId, int expireTime) {
   Long result = jedis.setnx(lockKey, requestId);
   if (result == 1) {
       // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
       jedis.expire(lockKey, expireTime);
  }
}

分析:错误示范中的的代码使用 sexnx
expire
两条命令来加锁。如果执行了 setnx
之后,程序崩溃,则无法设置过期时间,将发生死锁。

3.1.2 正确姿势

使用 set 的扩展命令 (SET EX PX NX)来保证加锁操作的原子性

SET key value[EX seconds][PX milliseconds][NX|XX]
  • NX:表示 key 不存在的时候,才能 set 成功,也即保证只有第一个客户端请求才能获得锁。而其他客户端请求只能等其释放锁,才能获取锁

  • EX seconds:设定 key 的过期时间,时间单位是秒

  • PX millseconds:设定 key 的过期时间,时间单位是毫秒

  • XX:表示 key 存在的时候,才能 set 成功

代码实现:

public class RedisTool {

   private static final String LOCK_SUCCESS = "OK";
   private static final String SET_IF_NOT_EXIST = "NX";
   private static final String SET_WITH_EXPIRE_TIME = "PX";

 
   public static boolean getLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
       String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

       if (LOCK_SUCCESS.equals(result)) {
           return true;
      }
       return false;
  }

}

这个方案存在的问题:

  • 锁过期释放了,业务还没有执行完

    客户端 A 中的业务执行时间超过了锁的超时时间 100s,此时 redis key 会被删除。此时,其他线程就可以加锁成功。不满足互斥性

  • 锁不可重入

    获取到锁的客户端,无法再次获取到锁

3.2 解锁

3.1.1 错误示范1
public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
   jedis.del(lockKey);
}

分析:可能会出现解锁了不属于自己的锁的情况

3.1.2 错误示范2
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {

   // 判断加锁与解锁是不是同一个客户端
   if (requestId.equals(jedis.get(lockKey))) {
       // 若在此时,这把锁突然不是这个客户端的,则会误解锁
       jedis.del(lockKey);
  }

}

分析:由于执行 【判断加锁与解锁是不是同一个客户端】和 【释放锁】不是原子操作。如果调用 jedis.del() 释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。

3.1.3 正确姿势

用 lua 脚本代替:

if redis.call('get', KEYS[1]) == ARGV[1] then
   return redis.call('del', KEYS[1])
else
   return 0
end

4. 基于 Redission 的分布式锁

由于上述自己实现的加锁方案,依然存在【锁过期释放了,业务还没有执行完】和 【锁不可重入】的问题。

当前开源框架Redisson解决了这个问题。我们一起来看下Redisson底层原理图吧:

只要线程一加锁成功,就会启动一个watch dog
看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用watch dog解决了「锁过期释放,业务没执行完」问题。

4.1 加锁原理

Redission 中的加锁部分的代码:

    @Override
   public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
       long time = unit.toMillis(waitTime);
       long current = System.currentTimeMillis();
       long threadId = Thread.currentThread().getId();
       // 尝试加锁, tryAcquire —> tryAcquireAsync -> tryLockInnerAsync
       Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
       // lock acquired
       if (ttl == null) {
           return true;
      }
       
       // 申请锁的耗时如多大于等于最大等待事件,则申请锁失败
       time -= System.currentTimeMillis() - current;
       if (time <= 0) {
           acquireFailed(waitTime, unit, threadId);
           return false;
      }
       
       current = System.currentTimeMillis();
       /**
        * 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
        * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
        *
        * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
        * 当 this.await 返回 true,进入循环尝试获取锁.
        */
       RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
       // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
       if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
           if (!subscribeFuture.cancel(false)) {
               subscribeFuture.onComplete((res, e) -> {
                   if (e == null) {
                       unsubscribe(subscribeFuture, threadId);
                  }
              });
          }
           acquireFailed(waitTime, unit, threadId);
           return false;
      }

       try {
           // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
           time -= System.currentTimeMillis() - current;
           if (time <= 0) {
               acquireFailed(waitTime, unit, threadId);
               return false;
          }
       
            /**
            * 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
            * 获取锁成功,则立马返回 true,
            * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
            */
           while (true) {
               long currentTime = System.currentTimeMillis();
               ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
               // lock acquired
               if (ttl == null) {
                   return true;
              }

               time -= System.currentTimeMillis() - currentTime;
               if (time <= 0) {
                   acquireFailed(waitTime, unit, threadId);
                   return false;
              }

/**
                * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
                */                
               currentTime = System.currentTimeMillis();
               if (ttl >= 0 && ttl < time) {
                   subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
              } else {
                   subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
              }

               time -= System.currentTimeMillis() - currentTime;
               if (time <= 0) {
                   acquireFailed(waitTime, unit, threadId);
                   return false;
              }
          }
      } finally {
           // 7.无论是否获得锁,都要取消订阅解锁消息
           unsubscribe(subscribeFuture, threadId);
      }
//       return get(tryLockAsync(waitTime, leaseTime, unit));
  }

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
       return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
       return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
               "if (redis.call('exists', KEYS[1]) == 0) then " +
                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                       "return nil; " +
                       "end; " +
                       "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                       "return nil; " +
                       "end; " +
                       "return redis.call('pttl', KEYS[1]);",
               Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
  }

其中的 lua 脚本提取出来如下:

if (redis.call('exists', KEYS[1]) == 0) then  
  redis.call('hincrby', KEYS[1], ARGV[2], 1);  
  redis.call('pexpire', KEYS[1], ARGV[1]);  
  return nil;  
end;  
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then  
   redis.call('hincrby', KEYS[1], ARGV[2], 1);  
   redis.call('pexpire', KEYS[1], ARGV[1]);  
   return nil;  
end;  
return redis.call('pttl', KEYS[1]);
  • KEYS[1]: 加锁的 key

  • ARGV[1]: 锁的超时时间

  • ARGV[2]: 加锁的客户端 ID

加锁过程:

  • 1)判断 key 值是否存在

  • 2)如果不存在,使用 hincrby
    命令设置一个 hash 结构

此时, redis 里的存储结构

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "1"

4.2 锁互斥机制

此时,如果客户端 2 来尝试加锁。第一个 if 判断为 false。第二个 if 判断里,通过 hexists 判断是否存在客户端2的 ID,判断结果为 false。

那么,客户端2会执行最后一句:

return redis.call('pttl', KEYS[1]);

返回 redis key 的剩余超时时间。

流程分析:

  1. 尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。

  2. 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false,也就是第 39 行代码。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。

  3. 循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release()
    方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

4.3 锁的续期机制

客户端 1 加锁的锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,怎么办呢?

Redisson 提供了一个续期机制, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。

代码如下:

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
       RFuture<Long> ttlRemainingFuture;
       if (leaseTime != -1) {
           ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
      } else {
           ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                   TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
      }
       ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
           if (e != null) {
               return;
          }

           // lock acquired
           if (ttlRemaining == null) {
               if (leaseTime != -1) {
                   internalLockLeaseTime = unit.toMillis(leaseTime);
              } else {
                   scheduleExpirationRenewal(threadId);
              }
          }
      });
       return ttlRemainingFuture;
  }

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP
里面,然后每隔 10 秒 (internalLockLeaseTime / 3)
检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP
里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

4.4 可重入加锁机制

Redission 是支持可重入锁的,例如下面的代码:

@Override
public void lock() {
   RLock lock = redissonSingle.getLock("myLock");
   try {
       lock.lock();

       // 执行业务
       doBusiness();
// 同一个线程中 lock 了两次
       lock.lock();

  } catch (Exception e) {
       e.printStackTrace();
  } finally {
       // 释放了两次锁
       lock.unlock();
       lock.unlock();
       logger.info("任务执行完毕, 释放锁!");
  }
}

分析其加锁的 lua 代码:

if (redis.call('exists', KEYS[1]) == 0) then  
  redis.call('hincrby', KEYS[1], ARGV[2], 1);  
  redis.call('pexpire', KEYS[1], ARGV[1]);  
  return nil;  
end;  
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then  
   redis.call('hincrby', KEYS[1], ARGV[2], 1);  
   redis.call('pexpire', KEYS[1], ARGV[1]);  
   return nil;  
end;  
return redis.call('pttl', KEYS[1]);

执行第二次 lock 时,第一个 if 判断 key 是否存在,结果为 false。执行第二个 if 判断, 结果为 true,会执行 hincrby
的操作,对客户端加锁次数加1。此时 myLock 数据结果变为:

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "2"

加锁支持可重入锁,那么解锁操作也需要支持。

4.2 释放锁机制

释放锁的代码实现:

    @Override
   public RFuture<Void> unlockAsync(long threadId) {
       RPromise<Void> result = new RedissonPromise<>();
       RFuture<Boolean> future = unlockInnerAsync(threadId);

       future.onComplete((opStatus, e) -> {
           cancelExpirationRenewal(threadId);

           if (e != null) {
               result.tryFailure(e);
               return;
          }

           if (opStatus == null) {
               IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                       + id + " thread-id: " + threadId);
               result.tryFailure(cause);
               return;
          }

           result.trySuccess(null);
      });

       return result;
  }
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
       return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                       "return nil;" +
                       "end; " +
                       "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                       "if (counter > 0) then " +
                       "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                       "return 0; " +
                       "else " +
                       "redis.call('del', KEYS[1]); " +
                       "redis.call('publish', KEYS[2], ARGV[1]); " +
                       "return 1; " +
                       "end; " +
                       "return nil;",
               Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
  }

提取其中的 lua 脚本:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then  
return nil;
end;  

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);  
if (counter > 0) then  
  // 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除
redis.call('pexpire', KEYS[1], ARGV[2]);  
return 0;  
else  
  // 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息
redis.call('del', KEYS[1]);  
redis.call('publish', KEYS[2], ARGV[1]);  
return 1;  
end;  
return nil;,
  • KEYS[1]: 加锁的 key

  • KEYS[2]: 释放锁发布订阅通道

  • ARGV[1]: 锁的超时时间

  • ARGV[2]: 锁名

  • ARGV[3]: 加锁的客户端 ID

释放锁的步骤:

  1. 删除锁(这里注意可重入锁,在上面的脚本中有详细分析)。

  2. 广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel
    publish 一条 UNLOCK_MESSAGE
    信息)。

  3. 取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP
    里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。


文章转载自PiPiD,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论