
什么情况下需要锁:
在高并发的情况下,当程序对某一个资源进行操作的时候,就可能发生数据不一致的情况,例如电商系统的库存扣减、余额扣减等,对于这种场景我们一般需要对资源的操作进行上锁,如果不上锁会怎么样呢,我们通过下面用户购买10台产品的库存扣减伪代码来看一下:
//扣减库存前一般会先查询对于仓库的sku库存数量select stock_num from t_stock where sku = 'S165879' and warehouse_no = 'WS6698';//判断库存是否足够if (stock_num < 10){return Result.error("库存不足");}//当程序执行到这里时,其它线程将stock_num扣减为0//这条sql执行完之后库存数变为 -10,实际上负/超卖了boolean result = update t_stock set stock_num = stock_num - 10 where sku = 'S165879' and warehouse_no = 'WS6698';//...
上面伪代码是典型的高并发场景下没有对仓库+SKU那条数据的库存数操作进行上锁的严重事故。
锁在宏观上使用最多的分为两种:
悲观锁:对资源直接上锁,每个线程要操作这个资源时必须排队。
乐观锁:不对资源直接上锁,而是在真正修改前判断我们关注的那个对象属性的状态是否发生改变(例如判断库存数量是否被改变)。
乐观锁实现思路:
如果是DB的方式进行扣减则可以将库存数量作为乐观锁版本号来保证,伪代码实现如下:
//扣减库存前一般会先查询对于仓库的sku库存数量select stock_num from t_stock where sku = 'S165879' and warehouse_no = 'WS6698';//判断库存是否足够if (stock_num < 10){return Result.error("库存不足");}//and stock_num = stock_num 就是防止库存数量被其它线程改变,乐观锁思路!boolean result = update t_stock set stock_num = stock_num - 10 where sku = 'S165879' and warehouse_no = 'WS6698' and stock_num = stock_num;if (!result){// 这里可以重试几次return Result.error("扣减库存失败");}
and stock_num = stock_num这个SQL条件就是乐观锁的思路,它是在真正修改前判断库存数量属性是否发生变化来防止数据不一致。
但是,很多情况下尤其是高并发场景下,架构设计上不会让程序直接打到DB上,而是在DB的前面做一道缓存来抗高并发,这时就不能通过SQL的方式来实现乐观锁了。我们更多使用的可能是悲观锁,看如下的Java版本实现的伪代码:
public class XXX {private static final Object stock_lock = new Object();public boolean deductStock(int num, String sku_warehouse){//.....//直接加锁,悲观锁方式,所有并发线程排队执行synchronized(stock_lock ){// 扣减缓存中的库存int stock_num = cache.get(sku_warehouse);if (stock_num < num){return false;}stock_num = stock_num - num;cache.set(sku_warehouse, stock_num);return true;}}}
以上通过Java程序实现悲观锁的方式看起来没什么问题,但是对于任何一个高并发的系统来说,应用肯定不可能是单节点的,通常都是集群部署,这时缓存通常也是使用类似Redis这种分布式缓存中间件来实现,这种情况下synchronized这种进程内存锁的方式是不行的,我们应该改用分布式锁。下面通过Redis来实现一个分布式锁,完整代码如下:
定义一个分布式锁接口,代码如下:
public interface DistributedLock {/*** 获取分布式锁** @return true:成功 false:失败*/boolean acquire();/*** 释放分布式锁** @return true:成功 false:失败*/boolean release();}
定义一个抽象类,实现分布式锁接口并定义抽象方法(模板设计模式):
public abstract class AbstractDistributedLock implements DistributedLock {protected Logger logger = LoggerFactory.getLogger(this.getClass());/*** 获取分布式锁** @return true:成功 false:失败*/@Overridepublic boolean acquire() {return this.doAcquire();}/*** 释放分布式锁** @return true:成功 false:失败*/@Overridepublic boolean release() {return this.doRelease();}/*** 获取分布式锁** @return true:成功 false:失败*/protected abstract boolean doAcquire();/*** 释放分布式锁** @return true:成功 false:失败*/protected abstract boolean doRelease();}
定义一个分布式锁对象DTO,代码如下:
public class RedisLockDto implements Serializable {private static final long serialVersionUID = 8898765432456L;/*** 锁超时时间 单位:ms 默认30s*/private long lockTimeoutMilli = 30000;/*** 重试次数,默认重试 5次*/private int retryNum = 5;/*** 重试间隔时间 单位:ms 默认间隔 100ms*/private long intervalMilli = 100;/*** 获取锁后锁标示符*/private String lockId;// ... 省略getter setter}
Redis分布式锁核心实现代码如下:
public class RedisDistributedLock extends AbstractDistributedLock {private final RedisOperations<String, Object> redisOperations;private final RedisLockDto dto;private final Long EXEC_FAIL = -10L;定义获取锁的lua脚本private final static DefaultRedisScript<Long> LOCK_LUA_SCRIPT = new DefaultRedisScript<>("if redis.call('setnx', KEYS[1], KEYS[2]) == 1 then return redis.call('pexpire', KEYS[1], ARGV[1]) else return -10 end", Long.class);定义释放锁的lua脚本private final static DefaultRedisScript<Long> RELEASELOCK_LUA_SCRIPT = new DefaultRedisScript<>("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return -10 end", Long.class);public RedisDistributedLock(RedisOperations<String, Object> redisOperations, RedisLockDto dto) {AssertUtils.isNull(redisOperations, "redisOperations dot not null");AssertUtils.isNull(dto, "dto dot not null");AssertUtils.isBlank(dto.getResourceId(), "resourceId dot not blank");this.redisOperations = redisOperations;this.dto = dto;}*** 获取一个UUID** @return*/private String getUuid() {return UUID.randomUUID().toString().replace("-", "");}*** 获取分布式锁** @return true:成功 false:失败*/@Overrideprotected boolean doAcquire() {锁标识符if (StringUtils.isBlank(dto.getLockId())) {dto.setLockId(this.getUuid());}List<String> keys = Arrays.asList(dto.getResourceId(), dto.getLockId());Long result = redisOperations.execute(LOCK_LUA_SCRIPT, keys, dto.getLockTimeoutMilli());获取失败,下面根据配置开始重试int i = 0;while (EXEC_FAIL.equals(result) && i < dto.getRetryNum()) {if (dto.getIntervalMilli() > 0) {try {Thread.sleep(dto.getIntervalMilli());} catch (InterruptedException ex) {logger.error("重试获取分布式锁失败,放弃获取", ex);}}result = redisOperations.execute(LOCK_LUA_SCRIPT, keys, dto.getLockTimeoutMilli());i++;}return !EXEC_FAIL.equals(result);}*** 释放分布式锁** @return true:成功 false:失败*/@Overrideprotected boolean doRelease() {Long result = null;try {List<String> keys = Arrays.asList(dto.getResourceId(), dto.getLockId());result = redisOperations.execute(RELEASELOCK_LUA_SCRIPT, keys);} catch (Exception e) {logger.error("release lock error", e);}return !EXEC_FAIL.equals(result);}}
通过Redis分布式锁重新实现前面扣减库存的伪代码如下:
public class XXX {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;private static final Object stock_lock = new Object();public boolean deductStock(int num, String sku_warehouse) {.....分布式悲观锁方式,所有并发线程排队执行DistributedLock lock = new RedisDistributedLock(redisTemplate,new RedisLockDto()重试间隔时间.setIntervalMilli(200)资源key.setLockId(sku_warehouse)重试次数.setRetryNum(5)//锁超时时间.setLockTimeoutMilli(30000));//如果能够拿到锁则表示获取到资源了if (lock.acquire()) {try {// 扣减缓存中的库存Integer stock_num = (Integer) redisTemplate.opsForValue().get(sku_warehouse);if (stock_num < num) {return false;}stock_num = stock_num - num;redisTemplate.opsForValue().set(sku_warehouse, stock_num);return true;} finally {//释放锁lock.release();}}return false;}}
Redis分布式锁实现要点:
1、lua脚本的方式可以保证当前执行脚本所有命令是原子性的
,这点很重要!!!
2、分布式锁必须设置一个超时时间防止某个节点将资源上锁之后程序刚好OOM
挂掉了导致那个锁一直不释放,程序其它节点一直获取不到,这点也很重要!
3、释放分布式锁时不需要保证是同一个客户端自己释放自己的锁,别的客户
端在程序上是无法释放别的客户端上的锁,这里可以通过值匹配的方式来保证。
4、前面实现的分布式锁有个小小的缺点,就是理论上靠程序员自己预估出程序
可能要执行的时长来设置锁超时时间的,如果程序执行了10分钟还没结束,那
个锁可能超时了被其它客户端拿到,这样可能会发生锁失效问题,很严重,只
能尽可能设置长一点的超时时间来保证。
5、如果Redis是集群主从的话,前面实现的分布式锁在主挂掉后从接手后那个
锁就丢失了,这种情况下需要另一种叫RedLock红锁来实现了。关于红锁这里就
不过多讲解,后面有时间再出一偏文章来专题讲解。
---------- 正文结束 ----------
长按扫码关注微信公众号
Java软件编程之家





