NO.1

背景

互斥条件: 某种资源一次只允许一个进程访问,即该资源一旦分配给一个进程使用,其他进程就不允许获取该资源,直到进程访问结束释放该资源
占有且等待条件: 一个进程本身占有一个或多个资源,同时还再等待其他线程释放需要的资源
不可抢占条件: 别的进程占有了一个资源,你不能直接将资源抢过来,不能强行剥夺
循环等待条件: 若干进程之间形成一种头尾相接的循环等待资源关系


当上述四个条件均满足,就会造成死锁,导致程序无法执行下去,那么怎么来解决死锁的问题呢?

当然实际解决方案有很多,我们这里只说说预防死锁的方案:
破坏“互斥”条件
就是在系统里取消互斥。若资源不被一个进程独占使用,那么死锁是肯定不会发生的。但一般来说在所列的四个条件中,“互斥”条件是无法破坏的。因此,在死锁预防里主要是破坏其他几个必要条件,而不去涉及破坏“互斥”条件
破坏"占有且等待"条件
破坏“占有并等待”条件,就是在系统中不允许进程在已获得某种资源的情况下,申请其他资源。即要想出一个办法,阻止进程在持有资源的同时申请其他资源,比如:所有的进程在开始运行之前,必须一次性地申请其在整个运行过程中所需要的全部资源
破坏“不可抢占”条件
破坏“不可抢占”条件就是允许对资源实行抢夺,如果占有某些资源的一个进程进行进一步资源请求被拒绝,则该进程必须释放它最初占有的资源
破坏“循环等待”条件
破坏“循环等待”条件的一种方法,是将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出。这样做就能保证系统不出现死锁


上面描述了一下,死锁的产生条件,以及一种预防方案,当我们在分布式系统中,使用分布式锁进行某些操作的时候,可能也会因为操作不当,导致系统间产生死锁,我们为了解决这个问题,可能会选择一次性的申请多个资源,这样更简单一点,当我们使用redisson来实现分布式锁的时候,能否支持呢?

NO.2

redisson使用

Config config = new Config();config.useSingleServer().setAddress("redis://192.168.1.17:6379");RedissonClient client = Redisson.create(config);RLock lock1 = client.getLock("lock1");RLock lock2 = client.getLock("lock2");RLock lock3 = client.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);// 同时加锁:lock1 lock2 lock3// 所有的锁都上锁成功才算成功。lock.lock();lock.unlock();
使用还是比较简单,同时声明多个锁然后交给RedissonMultiLock对象来管理
通过RedissonMultiLock的lock()/unlock()来进行操作
方法表面上使用很简单,但是实现原理呢?怎么来同时申请多个锁资源的呢?接下来我们来一起分析下它的实现原理
@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long baseWaitTime = locks.size() * 1500;long waitTime = -1;if (leaseTime == -1) {waitTime = baseWaitTime;} else {leaseTime = unit.toMillis(leaseTime);waitTime = leaseTime;if (waitTime <= 2000) {waitTime = 2000;} else if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}while (true) {if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}
根据锁的个数先计算一个基础的等待时间,如果调用者没有自己指定则就使用计算好的等待时间 默认为lockNum*1500ms
执行循环逻辑来尝试加锁,直到加锁成功跳出循环
看到这里我们发现加锁的核心逻辑就是tryLock方法,这里才是我们重点分析的地方
我们重点分析一下tryLock()方法,来一探究竟
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {acquiredLocks.stream().map(l -> (RedissonLock) l).map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS)).forEach(f -> f.syncUninterruptibly());}return true;}protected long calcLockWaitTime(long remainTime) {return remainTime;}protected int failedLocksLimit() {return 0;}
计算一个加锁剩余的时间remainTime和尝试加锁需要等待的时间lockWaitTime(默认就等于reaminTime)
查看允许加锁失败的个数,即对所有的锁对象,允许有几个锁对象加锁失败
创建一个集合来保存加锁成功的锁对象
遍历所有的锁对象,依次调用带超时参数的tryLock()来尝试进行加锁
如果在规定时间内加锁成功,则保存到acquiredLocks中
如果加锁失败则需要判断是否允许有加锁失败,这里failedLocksLimit为0,即只要有一个锁对象尝试加锁失败,那么就要释放掉之前成功的所有锁
为什么要这样呢?这就是上面预防死锁的方案中提到的破坏不可抢占条件,这样可以预防客户端加锁的时候产生死锁
判断加锁的时间是否超时,如果超时了则释放掉所有的锁资源,这样做也会防止死锁
如果tryLock加锁失败,那么lockInterruptibly()方法的while逻辑就不会跳出循环,他会继续尝试不断的进行加锁,直到所有的锁资源都成功获取
NO.3

释放锁原理

public void unlock() {List<RFuture<Void>> futures = new ArrayList<>(locks.size());for (RLock lock : locks) {futures.add(lock.unlockAsync());}for (RFuture<Void> future : futures) {future.syncUninterruptibly();}}
我们发现这个释放锁更简单,就是遍历所有的锁资源,然后调用其自身的释放锁方法,进行释放
同步等待所有的资源释放完毕,结束运行
释放锁的原理何其简单,只是一些循环逻辑,调用不同资源本身的释放逻辑来完成锁资源释放

点个在看你最好看




