一、基于zk的分布式锁
在单体应用中我们可以通过synchronized或ReentrantLock来实现锁,但是在分布式系统中就不好使了,因为synchronized和ReentrantLock只能作用于一个JVM,这时候就只能借助第三方组件,比如Redisson、数据库,当然,zk同样可以。
1.1、实现原理
类型 | 描述 |
PERSISTENT | 持久节点 |
PERSISTENT_SEQUENTIAL | 持久序号节点 |
EPHEMERAL | 临时节点 |
EPHEMERAL_SEQUENTIAL | 临时序号节点 |
节点的生命周期和客户端的会话绑定,一旦客户端和zk的会话失效,这个节点就会被删除
父节点会维护子节点的创建顺序,为每个子节点分配一个递增的序号,以后缀的形式体现在子节点的名称中
1.2、实现思路
客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点
客户端调用zk的getChildren()方法获取/lock下所有的子节点,同时添加对/lock的节点监听
客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁
如果在第3步中发现不是最小的,那么等待下一次子节点变更通知的时候,再进行第3步
如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁
客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可
客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点
客户端调用zk的getChildren()方法获取/lock下所有的子节点
客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁
如果在第3步中发现不是最小的,那么找到比自己小的相邻节点并对其添加监听
如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁
客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可
1.3、实现代码
package com.ayo.zklock;import com.ayo.zk.ZooKeeperClient;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Distributed_ZKLock implements Lock, Watcher {private ZooKeeper zk = null;// 根节点private String ROOT_LOCK = "/lock";// 竞争的资源private String lockName;// 等待的前一个锁private String WAIT_LOCK;// 当前锁private String CURRENT_LOCK;// 计数器private CountDownLatch countDownLatch;private int sessionTimeout = 10000;public Distributed_ZKLock(String lockName) {this.lockName = lockName;try {zk = new ZooKeeper("192.168.209.128:2181,192.168.209.129:2181,192.168.209.130:2181", sessionTimeout, this);Stat stat = zk.exists(ROOT_LOCK, false);if (stat == null) {//如果根节点不存在,就创建根节点,注意,根节点是持久节点zk.create(ROOT_LOCK, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}catch (Exception e) {e.printStackTrace();}}@Overridepublic void lock() {try {if (this.tryLock()) {System.out.println(Thread.currentThread().getName() + "-" + lockName + "获得了锁");return;} else {waitForLock(WAIT_LOCK, sessionTimeout);}}catch (Exception e) {e.printStackTrace();}}@Overridepublic void lockInterruptibly() throws InterruptedException {this.lock();}@Overridepublic boolean tryLock() {try {//之所以定义这个变量,是因为不同业务之间的锁都在一起放,但是lockname不同String spiltFlag = "_lock_";//创建的是临时序号节点CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + spiltFlag, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//取出来/lock节点下的所有子节点List<String> childrenNodes = zk.getChildren(ROOT_LOCK, false);//根据lockname筛选出指定业务的锁List<String> locks = new ArrayList<>();childrenNodes.forEach(node -> {//根据spiltFlag分隔开后,就是lockName和节点的序号了String _node = node.split(spiltFlag)[0];if (_node.equals(lockName)) {locks.add(node);}});//排序Collections.sort(locks);//判断当前节点是否是最小的临时序号节点,是的话,返回获取锁成功if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + locks.get(0))) {return true;}//如果当前节点不是最小的节点,就找到自己的前一个节点String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);WAIT_LOCK = locks.get(Collections.binarySearch(locks, prevNode) - 1);} catch (Exception e) {e.printStackTrace();}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {try {if (this.tryLock()) {return true;}return waitForLock(WAIT_LOCK, time);} catch (Exception e) {e.printStackTrace();}return false;}private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);if (stat != null) {this.countDownLatch = new CountDownLatch(1);//计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);this.countDownLatch = null;}return true;}@Overridepublic void unlock() {try {zk.delete(CURRENT_LOCK, -1);CURRENT_LOCK = null;zk.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic Condition newCondition() {return null;}@Overridepublic void process(WatchedEvent watchedEvent) {if (this.countDownLatch != null) {this.countDownLatch.countDown();}}}
package com.ayo.zklock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/*** 测试类,模拟河南人抢票*/public class TestZkLock {private static final Logger LOG = LoggerFactory.getLogger(TestZkLock.class);private Integer ticketNum = 10;class TicketThread implements Runnable {@Overridepublic void run() {Distributed_ZKLock lock = new Distributed_ZKLock("train");lock.lock();try {if (ticketNum > 0) {LOG.info(Thread.currentThread().getName() + "正在抢票,余票为:" + --ticketNum);}else {LOG.info(Thread.currentThread().getName() + "没抢住票");}}finally {lock.unlock();}}}public void ticketStart() {TicketThread thread = new TicketThread();for (int i = 1; i < 16; i++) {Thread t = new Thread(thread, "河南第" + i + "人");t.start();}}public static void main(String[] args) {new TestZkLock().ticketStart();}}




