ZK锁
节点类型
会话结束或会话超时,zk会删除该节点,不会形成死锁
分布式锁采用该节点类型
1:持久型
2:持久顺序型
3:临时型
4:临时顺序型
什么是羊群效应
当1000个客户端在等待锁释放,当锁释放时,会发布一个监听事件,此时1000个客户端都会被唤醒,去争抢锁操作。
curator已做优化,监听上一个节点。比如1监听0,2监听1.
分布式锁
当获取锁时,会判断锁对象中的线程是否为当前线程,如果为当前线程则说明回去到锁,自增加1
加锁时会创建顺序节点,并检查节点序号是否最小,如果最小则回去锁,如果不是最小则创建一个临时节点参与排队,并监听锁释放消息,等待唤醒争抢锁。
依赖临时顺序节点,当会话结束或超时时zk会删除节点
锁自动释放
阻塞
可重入
缺点:
1:锁的创建依赖创建和删除临时节点,且只能通过leader服务器执行,然后将数据同步到不同的follower上。
2:性能不高
优点:
1:可靠性高
zk锁可通过curator-API实现。
业务代码
boolean hasLock = false;
try {
//加锁
hasLock = tryLock(lock, actualLockPath, timeout);
if (hasLock) {
//加锁成功dosomething
} else {
//加锁失败dosomething
}
} catch (Throwable be) {
throw new BusinessException(be);
} finally {
if (hasLock) {
//解锁
releaseLock(lock, actualLockPath);
}
}
private boolean tryLock(InterProcessMutex mutex, String lockPath, long timeout) throws LockException {
boolean hasLock = false;
try {
hasLock = mutex.acquire(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new LockException(e);
}
return hasLock;
}
private void releaseLock(InterProcessMutex mutex, String lockPath) {
try {
mutex.release();
zookeeperCuatorClientHolder.getCuratorFrameworkInstance().delete().forPath(lockPath);
} catch (Exception e) {
e.printStackTrace();
}
}
加锁
//尝试获取锁 InterProcessMutex
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
//尝试获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
//当前线程
Thread currentThread = Thread.currentThread();
//当前线程是否持有锁
//LockData对象保存 当先锁线程,次数,路径。
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 持有锁则重入,计数器自增加1
lockData.lockCount.incrementAndGet();
return true;
}
//获取锁,并返回路径
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//获取锁成功,需要将路径,线程保存至lockDate对象
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//试图加锁
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{ //当前毫秒数,用于超时使用
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{ //创建临时有序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//循环自旋加锁
//判断是否最小节点需要,如果是表名加锁成功,返回true,否则阻塞等待
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//有可能网络抖动等因素导致zk创建节点失败,此时可配置重试策略
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{ //获取所有子节点
List<String> children = getSortedChildren();
//获取当前节点序号
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//当前节点是否为最小节点
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//为最小节点加锁成功跳出循环
haveTheLock = true;
}
else
{ //构造比自己小得节点,便于监听
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//同步监听比自己小节点,为了实现公平锁,避免羊群效应
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{ //等待超时则删除锁
doDelete = true;
break;
}
wait(millisToWait);
}
else
{//未设置超时则一直等待
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
解锁
@Override
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{ //当前线程不持有锁
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{ //重入锁,依次释放
return;
}
if ( newLockCount < 0 )
{ //释放为负数
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{ //释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{ //从map中移除持有锁对象的线程
threadData.remove(currentThread);
}
}
//释放锁
final void releaseLock(String lockPath) throws Exception
{
//发出移除监听事件
client.removeWatchers();
revocable.set(null);
//删除节点
deleteOurPath(lockPath);
}
总结
加锁流程
A:超时时间到达,超时会跳出自旋并删除节点,返回获取锁失败。
B:监听到前置节点删除,唤醒该线程,前置节点唤醒该线程后,继续自旋,继续判断该节点是否处于最小序号。(监听节点为本线程上一个节点,可以避免羊群效应,序号为zk自己创建)
1:当前线程是否持有锁
2:有锁则计数器cas+1,无锁则尝试获取锁,并将获取锁成功线程写入map,以便后续重入。
3:尝试获取锁,先创建数据节点,然后判断该节点序号是否最小值,如最小值则获取锁成功,否则阻塞等待并监听前置序号节点。
4:阻塞等待结束有两种
文章转载自魔都码农,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




