暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分布式锁的三种实现方式及对比分析

小罗技术笔记 2020-11-02
335

点击上方“小罗技术笔记”,关注公众号

第一时间送达实用干货


作者: yangjianzhou

来源:http://suo.im/64oeRk


一、问题介绍

日常工作中很多场景下需要用到分布式锁,例如:任务运行(多个节点同一时刻同一个任务只能在一个节点上运行(分片任务除外)),交易请求接收(前端交易请求发送时,可能由于两次提交,后端需要识别出这是一个交易)等,怎么样实现一个分布式锁呢?一般有:zookeeper、redis、database等三种实现方式。

二、分布式锁实现

1、zookeeper

zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁。流程如下:

1.1、代码实现

应用依赖:

  1.    <dependencies>

  2.        <dependency>

  3.            <groupId>org.apache.zookeeper</groupId>

  4.            <artifactId>zookeeper</artifactId>

  5.            <version>3.4.13</version>

  6.            <scope>compile</scope>

  7.            <exclusions>

  8.                <exclusion>

  9.                    <groupId>org.slf4j</groupId>

  10.                    <artifactId>slf4j-log4j12</artifactId>

  11.                </exclusion>

  12.            </exclusions>

  13.        </dependency>

  14.        <dependency>

  15.            <groupId>org.apache.curator</groupId>

  16.            <artifactId>curator-recipes</artifactId>

  17.            <version>4.0.0</version>

  18.        </dependency>

  19.    </dependencies>

客户端注入:

  1. @Configuration

  2. public class CuratorBean {

  3.    @Bean

  4.    public CuratorFramework curatorFramework() {

  5.        RetryPolicy retryPolicy = new RetryNTimes(3, 1000);

  6.        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);

  7.        return client;

  8.    }

  9. }

具体实现:

  1. package com.iwill.zookeeper.service;

  2. import org.apache.curator.framework.CuratorFramework;

  3. import org.apache.curator.framework.recipes.locks.InterProcessMutex;

  4. import org.apache.curator.utils.CloseableUtils;

  5. import org.slf4j.Logger;

  6. import org.slf4j.LoggerFactory;

  7. import org.springframework.beans.factory.DisposableBean;

  8. import org.springframework.beans.factory.InitializingBean;

  9. import org.springframework.beans.factory.annotation.Autowired;

  10. import org.springframework.stereotype.Service;

  11. import java.util.concurrent.TimeUnit;

  12. @Service

  13. public class CuratorClient implements InitializingBean, DisposableBean {

  14.    @Autowired

  15.    private CuratorFramework client;

  16.    private final Logger logger = LoggerFactory.getLogger(this.getClass());

  17.    public void execute(String lockPath, BusinessService businessService) throws Exception {

  18.        InterProcessMutex lock = new InterProcessMutex(client, lockPath);

  19.        try {

  20.            boolean acquireLockSuccess = lock.acquire(200, TimeUnit.MILLISECONDS);

  21.            if (!acquireLockSuccess) {

  22.                logger.warn("acquire lock fail , thread id : " + Thread.currentThread().getId());

  23.                return;

  24.            }

  25.            logger.info("acquire lock success ,thread id : " + Thread.currentThread().getId());

  26.            businessService.handle();

  27.        } catch (Exception exp) {

  28.            logger.error("execute throw exp", exp);

  29.        } finally {

  30.            if (lock.isOwnedByCurrentThread()) {

  31.                lock.release();

  32.            }

  33.        }

  34.    }

  35.    /**

  36.     * Invoked by a BeanFactory on destruction of a singleton.

  37.     *

  38.     * @throws Exception in case of shutdown errors.

  39.     *                   Exceptions will get logged but not rethrown to allow

  40.     *                   other beans to release their resources too.

  41.     */

  42.    @Override

  43.    public void destroy() throws Exception {

  44.        CloseableUtils.closeQuietly(client);

  45.    }

  46.    /**

  47.     * Invoked by a BeanFactory after it has set all bean properties supplied

  48.     * (and satisfied BeanFactoryAware and ApplicationContextAware).

  49.     * <p>This method allows the bean instance to perform initialization only

  50.     * possible when all bean properties have been set and to throw an

  51.     * exception in the event of misconfiguration.

  52.     *

  53.     * @throws Exception in the event of misconfiguration (such

  54.     *                   as failure to set an essential property) or if initialization fails.

  55.     */

  56.    @Override

  57.    public void afterPropertiesSet() throws Exception {

  58.        client.start();

  59.    }

  60. }

1.2、扩展分析

会话的建立与关闭:

在client.start调用后,就会创建与zookeeper服务器之间的会话链接。进入到client.start里面查看到代码如下:

  1.    @Override

  2.    public void start()

  3.    {

  4.        log.info("Starting");

  5.        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )

  6.        {

  7.            throw new IllegalStateException("Cannot be started more than once");

  8.        }  

  9.     try

  10.        {

  11.            connectionStateManager.start(); // ordering dependency - must be called before client.start()

  12.            final ConnectionStateListener listener = new ConnectionStateListener()

  13.            {

  14.                @Override

  15.                public void stateChanged(CuratorFramework client, ConnectionState newState)

  16.                {

  17.                    if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )

  18.                    {

  19.                        logAsErrorConnectionErrors.set(true);

  20.                    }

  21.                }

  22.            };

  23.            this.getConnectionStateListenable().addListener(listener);

  24.            client.start();

  25.            executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

  26.            executorService.submit(new Callable<Object>()

  27.            {

  28.                @Override

  29.                public Object call() throws Exception

  30.                {

  31.                    backgroundOperationsLoop();

  32.                    return null;

  33.                }

  34.            });

  35.            if ( ensembleTracker != null )

  36.            {

  37.                ensembleTracker.start();

  38.            }

  39.            log.info(schemaSet.toDocumentation());

  40.        }

  41.        catch ( Exception e )

  42.        {

  43.            ThreadUtils.checkInterrupted(e);

  44.            handleBackgroundOperationException(null, e);

  45.        }

  46.    }

系统启动部分日志如下:

系统关闭时,系统的日志:

系统启动时zookeeper的日志:

系统关闭时zookeeper的日志:

由此可知,应用在启动的时候(client.start方法执行的时候)zookeeper客户端就会和zookeeper服务器时间建立会话,系统关闭时,客户端与zookeeper服务器的会话就关闭了。

节点创建:

跟踪lock.acquire(200, TimeUnit.MILLISECONDS)进入到

org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver#createsTheLock,

代码如下:

  1.    @Override

  2.    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception

  3.    {

  4.        String ourPath;

  5.        if ( lockNodeBytes != null )

  6.        {

  7.            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);

  8.        }

  9.        else

  10.        {

  11.            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

  12.        }

  13.        return ourPath;

  14.    }

可以看到,创建的节点为临时会话顺序节点(EPHEMERAL_SEQUENTIAL),该节点的说明如下:

  1.    /**

  2.     * The znode will be deleted upon the client's disconnect, and its name

  3.     * will be appended with a monotonically increasing number.

  4.     */

即该节点会在客户端链接断开时被删除,还有,我们调用

org.apache.curator.framework.recipes.locks.InterProcessMutex#release

时也会删除该节点。

可重入性:

跟踪获取锁的代码进入到

org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock,

代码如下:

  1.    private boolean internalLock(long time, TimeUnit unit) throws Exception

  2.    {

  3.        /*

  4.           Note on concurrency: a given lockData instance

  5.           can be only acted on by a single thread so locking isn't necessary

  6.        */

  7.        Thread currentThread = Thread.currentThread();

  8.        LockData lockData = threadData.get(currentThread);

  9.        if ( lockData != null )

  10.        {

  11.            // re-entering

  12.            lockData.lockCount.incrementAndGet();

  13.            return true;

  14.        }

  15.        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

  16.        if ( lockPath != null )

  17.        {

  18.            LockData newLockData = new LockData(currentThread, lockPath);

  19.            threadData.put(currentThread, newLockData);

  20.            return true;

  21.        }

  22.        return false;

  23.    }

可以看见zookeeper的锁是可重入的,即同一个线程可以多次获取锁,只有第一次真正的去创建临时会话顺序节点,后面的获取锁都是对重入次数加1。相应的,在释放锁的时候,前面都是对锁的重入次数减1,只有最后一次才是真正的去删除节点。代码见:

  1.    @Override

  2.    public void release() throws Exception

  3.    {

  4.        /*

  5.            Note on concurrency: a given lockData instance

  6.            can be only acted on by a single thread so locking isn't necessary

  7.         */

  8.        Thread currentThread = Thread.currentThread();

  9.        LockData lockData = threadData.get(currentThread);

  10.        if ( lockData == null )

  11.        {

  12.            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);

  13.        }

  14.        int newLockCount = lockData.lockCount.decrementAndGet();

  15.        if ( newLockCount > 0 )

  16.        {

  17.            return;

  18.        }

  19.        if ( newLockCount < 0 )

  20.        {

  21.            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);

  22.        }

  23.        try

  24.        {

  25.            internals.releaseLock(lockData.lockPath);

  26.        }

  27.        finally

  28.        {

  29.            threadData.remove(currentThread);

  30.        }

  31.    }

多线程并发获取同一个锁时,服务端的数据结构如下:

  1. [zk: localhost:2181(CONNECTED) 9] ls /lock-path

  2. [_c_c38d1220-26d5-4001-9f20-5bc447f37229-lock-0000000104, _c_a2ced468-86f2-466e-bf38-0432350b65f2-lock-0000000103]

  3. [zk: localhost:2181(CONNECTED) 10]

释放锁时,会删除_c_c38d1220-26d5-4001-9f20-5bc447f37229-lock-0000000104和_c_a2ced468-86f2-466e-bf38-0432350b65f2-lock-0000000103这样的临时会话顺序节点,但是它们的父节点/lock-path不会被删除。因此,高并发的业务场景下使用zookeeper分布式锁时,会留下很多的空节点。

客户端故障检测:

正常情况下,客户端会在会话的有效期内,向服务器端发送PING 请求,来进行心跳检查,说明自己还是存活的。服务器端接收到客户端的请求后,会进行对应的客户端的会话激活,会话激活就会延长该会话的存活期。如果有会话一直没有激活,那么说明该客户端出问题了,服务器端的会话超时检测任务就会检查出那些一直没有被激活的与客户端的会话,然后进行清理,清理中有一步就是删除临时会话节点(包括临时会话顺序节点)(参见《从paxos到zookeeper分布式一致性原理与实践》“会话”一节)。这就保证了zookeeper分布锁的容错性,不会因为客户端的意外退出,导致锁一直不释放,其他客户端获取不到锁。

数据一致性:

zookeeper服务器集群一般由一个leader节点和其他的follower节点组成,数据的读写都是在leader节点上进行。当一个写请求过来时,leader节点会发起一个proposal,待大多数follower节点都返回ack之后,再发起commit,待大多数follower节点都对这个proposal进行commit了,leader才会对客户端返回请求成功;如果之后leader挂掉了,那么由于zookeeper集群的leader选举算法采用zab协议保证数据最新的follower节点当选为新的leader,所以,新的leader节点上都会有原来leader节点上提交的所有数据。这样就保证了客户端请求数据的一致性了。

CAP:

任何分布式架构都不能同时满足C(一致性)、A(可用性)、P(分区耐受性),因此,zookeeper集群在保证一致性的同时,在A和P之间做了取舍,最终选择了P,因此可用性差一点。参考:https://juejin.im/post/5afe4f285188251b8015e4b6

综上所述,zookeeper分布式锁保证了锁的容错性、一致性。但是会产生空闲节点(/lock-path),并且有些时候不可用。

2、redis

获取锁时,使用redis的命令setnx、pexpire(提供基于毫秒的过期时间,expire提供基于秒的过期时间)+ lua脚本(保证脚本中的命令被一起执行,不间断)来实现分布式锁。删除锁时,先执行get,如果获取的值是自己设置的,则执行del操作,同时,这两个操作也放在lua脚本中执行,来保证原子性。流程如下:

2.1、代码实现

项目依赖:

  1.    <dependencies>

  2.        <dependency>

  3.            <groupId>redis.clients</groupId>

  4.            <artifactId>jedis</artifactId>

  5.            <version>2.9.0</version>

  6.        </dependency>

  7.    </dependencies>

注入bean:

  1. @Configuration

  2. public class JedisBean {

  3.    @Bean

  4.    public Jedis jedis(){

  5.        Jedis jedis = new Jedis("redis://127.0.0.1:6379");

  6.        return jedis ;

  7.    }

  8. }

获取锁的代码:

lua脚本:

  1.    private static String lockScript                = "local key = KEYS[1]                            \n"

  2.                                                    + "local value = ARGV[1]                          \n"

  3.                                                    + "local expireTime = ARGV[2]                     \n"

  4.                                                    + "                                               \n"

  5.                                                    + "if (redis.call('setnx',key,value) == 1) then   \n"

  6.                                                    + "   redis.call('pexpire' , key , expireTime)    \n"

  7.                                                    + "   return 'true'                               \n"

  8.                                                    + "else                                           \n"

  9.                                                    + "   return 'false'                              \n"

  10.                                                    + "end                                            \n";

对应的java代码:

  1.    /**

  2.     * 可重入的获取锁

  3.     * 使用lua脚本来保证setnx和pexpire是一个原子操作

  4.     * 获取锁成功后,里面启动过期续约任务

  5.     *

  6.     * @param key

  7.     * @param expireTime

  8.     * @return

  9.     */

  10.    public boolean acquire(String key, long expireTime) {

  11.        Thread currentThread = Thread.currentThread();

  12.        LockData lockData = threadData.get(currentThread);

  13.        if (lockData != null) {

  14.            lockData.lockCount.incrementAndGet();

  15.            return true;

  16.        }

  17.        if (lockScriptSHA == null) {

  18.            lockScriptSHA = client.scriptLoad(lockScript);

  19.        }

  20.        String owner = generatorOwner();

  21.        boolean acquired = false;

  22.        try {

  23.            Object result = client.evalsha(lockScriptSHA, 1, key, owner, String.valueOf(expireTime));

  24.            acquired = Boolean.valueOf((String) result);

  25.        } catch (Exception exp) {

  26.            logger.error("execute eval sha throw exp", exp);

  27.        }

  28.        if (acquired) {

  29.            startExtendExpireTimeTask(key, owner, expireTime);

  30.            lockData = new LockData(currentThread, key, owner);

  31.            threadData.put(currentThread, lockData);

  32.            return true;

  33.        }

  34.        return false;

  35.    }

这里采用和zookeeper分布式锁可重入同样的方式,使得redis锁可重入。

租期延长:

由于redis没有zookeeper的会话机制来保证业务运行期间,该线程一直持有锁,而是使用redis的key的过期时间来保证,为了保证业务运行期间,一直持有锁,我们在这里人为的启用的一个任务来为获取的锁延长过期时间,以此来达到和zookeeper分布式锁同样的效果。实现如下:

lua脚本:

  1.    private static String extendExpireTimeScript    = "local key = KEYS[1]                            \n"

  2.                                                    + "local value = ARGV[1]                          \n"

  3.                                                    + "local newExpireTime = ARGV[2]                  \n"

  4.                                                    + "                                               \n"

  5.                                                    + "if (redis.call('get',key) == value)  then      \n"

  6.                                                    + "   redis.call('pexpire' , key ,newExpireTime)  \n"

  7.                                                    + "   return 'true'                               \n"

  8.                                                    + "else                                           \n"

  9.                                                    + "   return 'false'                              \n"

  10.                                                    + "end                                            \n";

该脚本保证只有自己才能延长key的过期时间,其他线程则不能进行此操作。java代码如下:

  1.    /**

  2.     * 租期续约任务,在当前线程还运行的情况下,延长过期时间

  3.     *

  4.     * @param key

  5.     * @param owner

  6.     * @param expireTime

  7.     */

  8.    private void startExtendExpireTimeTask(String key, String owner, long expireTime) {

  9.        if (extendExpireTimeScriptSHA == null) {

  10.            extendExpireTimeScriptSHA = client.scriptLoad(extendExpireTimeScript);

  11.        }

  12.        Timer timer = new Timer();

  13.        timer.schedule(new TimerTask() {

  14.            @Override

  15.            public void run() {

  16.                try {

  17.                    Object result = client.evalsha(extendExpireTimeScriptSHA, 1, key, owner, String.valueOf(expireTime));

  18.                    boolean extendSuccess = Boolean.valueOf((String) result);

  19.                    if (!extendSuccess) {

  20.                        timer.cancel();

  21.                    }

  22.                } catch (Exception exp) {

  23.                    timer.cancel();

  24.                }

  25.            }

  26.        }, 0, expireTime * 3 / 4);

  27.    }

这里的续租方式会使得性能下降,如果同一个应用中,同时很多线程去获取锁,那么就会启动很多的timer线程,这会增加系统开销。还有续租时间严重依赖与锁过期时间,如果锁过期时间很短,某一时刻客户端与redis服务器之间的网络出现网络抖动了,就可能出现该业务没执行完(执行时间稍微大于锁过期时间),导致锁过期被删除了,其他客户端就获取锁了。前一个获取锁的线程就会在无锁条件下运行。

释放锁的实现如下:

lua脚本:

  1.    private static String unlockScript              = "local key = KEYS[1]                            \n"

  2.                                                    + "local value = ARGV[1]                          \n"

  3.                                                    + "                                               \n"

  4.                                                    + "if (redis.call('get',key) == value)  then      \n"

  5.                                                    + "   redis.call('del' , key )                    \n"

  6.                                                    + "   return 'true'                               \n"

  7.                                                    + "else                                           \n"

  8.                                                    + "   return 'false'                              \n"

  9.                                                    + "end                                            \n";

该脚本保证只能获取锁的线程才可以删除该锁。对应的java实现如下:

  1.    /**

  2.     * 释放锁,获取多少次锁,就释放多少次锁

  3.     *

  4.     * @return

  5.     */

  6.    public boolean release() {

  7.        Thread currentThread = Thread.currentThread();

  8.        LockData lockData = threadData.get(currentThread);

  9.        if (lockData == null) {

  10.            throw new RuntimeException("current thread do not own lock");

  11.        }

  12.        int newLockCount = lockData.lockCount.decrementAndGet();

  13.        if (newLockCount > 0) {

  14.            return true;

  15.        }

  16.        if (newLockCount < 0) {

  17.            throw new RuntimeException("Lock count has gone negative for lock :" + lockData.key);

  18.        }

  19.        if (unlockScriptSHA == null) {

  20.            unlockScriptSHA = client.scriptLoad(unlockScript);

  21.        }

  22.        try {

  23.            Object result = client.evalsha(unlockScriptSHA, 1, lockData.key, lockData.owner);

  24.            boolean unlocked = Boolean.valueOf((String) result);

  25.            if (!unlocked) {

  26.                logger.error(String.format("unlock fail ,key = %s", lockData.key));

  27.            }

  28.        } finally {

  29.            threadData.remove(currentThread);

  30.        }

  31.        return true;

  32.    }

这里释放锁时,采用可重入的方式,同样借鉴于zookeeper分布式锁可重入的实现。

2.2、扩展分析

socket链接

redis客户端每次发送请求到服务器时,都与服务器之间建立一个socket来进行。分析如下:

跟踪代码

Object result = client.evalsha(unlockScriptSHA, 1, lockData.key, lockData.owner)进入到redis.clients.jedis.Connection#connect,

源码如下:

  1.  public void connect() {

  2.    if (!isConnected()) {

  3.      try {

  4.        socket = new Socket();

  5.        // ->@wjw_add

  6.        socket.setReuseAddress(true);

  7.        socket.setKeepAlive(true); // Will monitor the TCP connection is

  8.        // valid

  9.        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to

  10.        // ensure timely delivery of data

  11.        socket.setSoLinger(true, 0); // Control calls close () method,

  12.        // the underlying socket is closed

  13.        // immediately

  14.        // <-@wjw_add

  15.        socket.connect(new InetSocketAddress(host, port), connectionTimeout);

  16.        socket.setSoTimeout(soTimeout);

  17.        if (ssl) {

  18.          if (null == sslSocketFactory) {

  19.            sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();

  20.          }

  21.          socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);

  22.          if (null != sslParameters) {

  23.            ((SSLSocket) socket).setSSLParameters(sslParameters);

  24.          }

  25.          if ((null != hostnameVerifier) &&

  26.              (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {

  27.            String message = String.format(

  28.                "The connection to '%s' failed ssl/tls hostname verification.", host);

  29.            throw new JedisConnectionException(message);

  30.          }

  31.        }

  32.        outputStream = new RedisOutputStream(socket.getOutputStream());

  33.        inputStream = new RedisInputStream(socket.getInputStream());

  34.      } catch (IOException ex) {

  35.        broken = true;

  36.        throw new JedisConnectionException(ex);

  37.      }

  38.    }

  39.  }

调用client.close之后,就会关闭socket链接,代码如下:

  1.  public void disconnect() {

  2.    if (isConnected()) {

  3.      try {

  4.        outputStream.flush();

  5.        socket.close();

  6.      } catch (IOException ex) {

  7.        broken = true;

  8.        throw new JedisConnectionException(ex);

  9.      } finally {

  10.        IOUtils.closeQuietly(socket);

  11.      }

  12.    }

  13.  }

一个系统中jedis与redis服务器端就建立一个链接,如果这个服务中使用redis的量很大,那么这里会是一个瓶颈,因此这时可以使用jedisPool(链接池)来优化。虽然客户端与服务器端有建立链接,但是redis服务器端不会根据链接有效性去给该链接设置的key来重新设置过期时间。因此,redis分布式锁需要客户端自己去延长过期时间或者在最开始设置过期时间的时候,设置一个足够长的过期时间来满足业务一直执行完,一直持有锁。

一致性:

redis集群中leader与slave之间的数据复制是采用异步的方式(因为需要满足高性能要求),即,leader将客户端发送的写请求记录下来后,就给客户端返回响应,后续该leader的slave节点就会从该leader节点复制数据。那么就会存在这么一种可能性:leader接收了客户端的写请求,也给客户端响应了,但是该数据还没来得及复制到它对应的slave节点中,leader就crash了,从slave节点中重新选举出来的leader也不包含之前leader最后写的数据了,这时,客户端来获取同样的锁就可以获取到,这样就会在同一时刻,两个客户端持有锁。

CAP:

redis的初衷是提供一个高性能的内存存储,对客户端的请求需要很快速的作出响应,因此,高性能是一个重要目标,如果要保证leader和slave之间的数据同步一致,就会牺牲性能。setinel和cluster都实现了高可用,也保证了P,因此redis保证了CAP中的AP。

公平竞争:

上述实现的redis分布式锁不具有获取失败排队等待的情况,因此不具有偏向性。任意时刻,都是竞争获取。

综上所属,redis分布式锁具有高并发、高可用的特性,但是,在极端情况下,存在一定的问题。redis官网提供的redlock在redisson中实现了,由于它需要在大多数节点中都获取同样的锁,因此相较于但节点的锁获取,性能会有所降低。

3、database

用数据库实现分布式锁的方式和redis分布式锁的实现方式类似,这里采用数据库表的唯一键的形式。如果同一个时刻,多个线程同时向一个表中插入同样的记录,由于唯一键的原因,只能有一个线程插入成功。流程图如下:

3.1、代码实现

表结构如下:

  1. CREATE TABLE `lock_record` (

  2.  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',

  3.  `lock_name` varchar(50) DEFAULT NULL COMMENT '锁名称',

  4.  `expire_time` bigint(20) DEFAULT NULL COMMENT '过期时间',

  5.  `version` int(11) DEFAULT NULL COMMENT '版本号',

  6.  `lock_owner` varchar(100) DEFAULT NULL COMMENT '锁拥有者',

  7.  PRIMARY KEY (`id`),

  8.  UNIQUE KEY `lock_name` (`lock_name`)

  9. )

获取锁的代码如下:

  1.   /**

  2.     * @param lockName 锁名称

  3.     * @param lockTime 锁时间

  4.     * @return

  5.     */

  6.    public boolean acquire(String lockName, Long lockTime) {

  7.        Thread currentThread = Thread.currentThread();

  8.        LockData lockData = threadData.get(currentThread);

  9.        if (lockData != null) {

  10.            lockData.lockCount.incrementAndGet();

  11.            return true;

  12.        }

  13.        LockRecordDTO lockRecord = lockRecordMapperExt.selectByLockName(lockName);

  14.        if (lockRecord == null) {

  15.            String lockOwner = generatorOwner();

  16.            boolean acquired = tryAcquire(lockName, lockTime, lockOwner);

  17.            if (acquired) {

  18.                startExtendExpireTimeTask(lockName, lockOwner, lockTime);

  19.                lockData = new LockData(currentThread, lockName, lockOwner);

  20.                threadData.put(currentThread, lockData);

  21.            }

  22.            return acquired;

  23.        }

  24.        long lockExpireTime = lockRecord.getExpireTime();

  25.        if (lockExpireTime < System.currentTimeMillis()) {

  26.            String lockOwner = generatorOwner();

  27.            boolean acquired = tryAcquire(lockRecord, lockTime, lockOwner);

  28.            if (acquired) {

  29.                lockData = new LockData(currentThread, lockName, lockOwner);

  30.                threadData.put(currentThread, lockData);

  31.            }

  32.            return acquired;

  33.        }

  34.        return false;

  35.    }

  1.   /**

  2.     * 尝试获得锁,数据库表有设置唯一键约束,只有插入成功的线程才可以获取锁

  3.     *

  4.     * @param lockName  锁名称

  5.     * @param lockTime  锁的过期时间

  6.     * @param lockOwner 锁的拥有者

  7.     * @return

  8.     */

  9.    private boolean tryAcquire(String lockName, long lockTime, String lockOwner) {

  10.        try {

  11.            LockRecordDTO lockRecord = new LockRecordDTO();

  12.            lockRecord.setLockName(lockName);

  13.            Long expireTime = System.currentTimeMillis() + lockTime;

  14.            lockRecord.setExpireTime(expireTime);

  15.            lockRecord.setLockOwner(lockOwner);

  16.            lockRecord.setVersion(0);

  17.            int insertCount = lockRecordMapperExt.insert(lockRecord);

  18.            return insertCount == 1;

  19.        } catch (Exception exp) {

  20.            return false;

  21.        }

  22.    }

  1.    /**

  2.     * 当上一次获取锁的线程没有正确释放锁时,下一次其他线程获取锁时会调用本方法

  3.     * 当多个线程竞争获取锁时,有乐观锁控制,只有更新成功的线程才会获的锁

  4.     *

  5.     * @param lockRecord 锁记录,里面保存了上一次获取锁的拥有者信息

  6.     * @param lockTime   锁过期时间

  7.     * @param lockOwner  锁的拥有者

  8.     * @return

  9.     */

  10.    private boolean tryAcquire(LockRecordDTO lockRecord, long lockTime, String lockOwner) {

  11.        try {

  12.            Long expireTime = System.currentTimeMillis() + lockTime;

  13.            lockRecord.setExpireTime(expireTime);

  14.            lockRecord.setLockOwner(lockOwner);

  15.            int updateCount = lockRecordMapperExt.updateExpireTime(lockRecord);

  16.            return updateCount == 1;

  17.        } catch (Exception exp) {

  18.            return false;

  19.        }

  20.    }

对应乐观锁更新sql如下:

  1.  <update id="updateExpireTime" parameterType="com.iwill.db.model.LockRecordDTO">

  2.    update lock_record

  3.    set expire_time = #{expireTime},

  4.     version = version + 1

  5.    where lock_name = #{lockName} and version = #{version}

  6.  </update>

获取锁时,如果数据库中有记录且超时时间小于当前时间,说明持有锁的客户端崩溃退出了,没有正确释放锁,才会导致表中有过期的记录。这时,并发的获取锁时,只有更新成功的线程才可以获取锁。

释放锁时,只有持有锁的线程才可以释放锁,代码如下:

  1.    /**

  2.     * 释放锁

  3.     * 实现参考zookeeper的锁释放机制

  4.     */

  5.    public void release() {

  6.        Thread currentThread = Thread.currentThread();

  7.        LockData lockData = threadData.get(currentThread);

  8.        if (lockData == null) {

  9.            throw new RuntimeException("current thread do not own lock");

  10.        }

  11.        int newLockCount = lockData.lockCount.decrementAndGet();

  12.        if (newLockCount > 0) {

  13.            return;

  14.        }

  15.        if (newLockCount < 0) {

  16.            throw new RuntimeException("Lock count has gone negative for lock :" + lockData.lockName);

  17.        }

  18.        try {

  19.            lockRecordMapperExt.deleteByOwner(lockData.lockName, lockData.owner);

  20.        } finally {

  21.            threadData.remove(currentThread);

  22.        }

  23.    }

对应的底层sql如下:

  1.  <delete id="deleteByOwner" parameterType="java.util.Map">

  2.    delete from lock_record where lock_name = #{lockName} and lock_owner = #{lockOwner}

  3.  </delete>

3.2、扩展分析

数据库的方式实现分布式锁,存在一个明显的问题,就是单节点问题,这里可以通过主-备的形式来缓解,但是这样会引来数据不一致的问题。而且,数据库的方式在高并发的情况下会存在性能问题。

三、对比分析

1、zookeeper分布式锁实现简单,集群自己来保证数据一致性,但是会存在建立无用节点且多节点之间需要同步数据的问题,因此一般适合于并发量小的场景使用,例如定时任务的运行等。

2、redis分布式锁(非redlock)由于redis自己的高性能原因,会有很好的性能,但是极端情况下会存在两个客户端获取锁(可以通过监控leader故障和运维措施来缓解和解决该问题),因此适用于高并发的场景。

3、database分布式锁由于数据库本身的限制:性能不高且不满足高可用(即是存在备份,也会导致数据不一致),因此,工作中很难见到真正使用数据库来作为分布式锁的解决方案,这里使用数据库实现主要是为了理解分布式锁的实现原理。



长按二维码关注

点个在看再走呗!

文章转载自小罗技术笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论