暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HikariCP - 源码赏析

蚂蚁的宝藏 2021-06-25
1882

100多k的包,性能如何做到极致,一起来学习源码,细节干货满满~

启动流程

  • HikariDataSource初始化

作为springboot 2.0的默认数据源连接池,HikariCP配置方式很丝滑

like this:

springboot 就会按照指定的type,来进行 HikariDataSource 的初始化。

通过反射,来构造HikariDataSource 实例。

HikariDataSource
提供了一个无参构造方法,和指定HikariConfig
的构造方法

   public HikariDataSource()
   {
      super();
      fastPathPool = null;
   }

   public HikariDataSource(HikariConfig configuration)
   {
      configuration.validate();
      configuration.copyStateTo(this);

      LOGGER.info("{} - Starting...", configuration.getPoolName());
      pool = fastPathPool = new HikariPool(this);
      LOGGER.info("{} - Start completed.", configuration.getPoolName());

      this.seal();
   }

两种方式,大有不同,带参的构造方法,会真正直接去初始化HikariCP的连接池,并同时赋值fastPathPool
,(这个fastPathPool
也是HikariCP快的一个小优化点, 后面分析)。

而springboot yml配置的方式,用的是无参构造方法,super()方法也只是设置了一些默认属性。

   public HikariConfig()
   {
      dataSourceProperties = new Properties();
      healthCheckProperties = new Properties();

      minIdle = -1;
      maxPoolSize = -1;
      maxLifetime = MAX_LIFETIME;
      connectionTimeout = CONNECTION_TIMEOUT;
      validationTimeout = VALIDATION_TIMEOUT;
      idleTimeout = IDLE_TIMEOUT;
      initializationFailTimeout = 1;
      isAutoCommit = true;

      String systemProp = System.getProperty("hikaricp.configurationFile");
      if (systemProp != null) {
         loadProperties(systemProp);
      }
   }

根本没有初始化 HikariCP的连接池,可以说干了个寂寞!

最终,

DataSourceHealthIndicator
在系统启动时,会做一次健康检查,这需要获取一个conection,(然后通过调用配置中的validationQuery语句 或者 jdbc4 支持的isValid()方法))。

而在这个获取connection的过程,才去完成连接池的初始化。

   @Override
   public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();
   }

初始化HikariPool
时,为了并发安全考虑,很细节的使用的DCL。

此处也可以发现fastPathPool
的良苦用心:

getConnection 作为连接池的重要操作,如果使用HikariConfig的方式去创建dataSource,那么就可以直接从fastPathPool
来获取连接了。

所以,如果为了使用到这个小彩蛋,不妨可以使用,java Configuration代码方式去配置HikariDataSouce

  • HikariPool 初始化

在初始化HikariPool
前,会先进行一个validate()
操作,这个操作除了,对一些基本配置校验,进行缺省赋值外,


还有上篇配置文说道的,一些配置超过默写之后的重新赋值,这是造成配置不生效或者埋雷的一些关键。

校验,配置重新赋值完成后,就要完成真正的HikariPool
初始化了。

public HikariPool(final HikariConfig config)
   {
      super(config);
   // 创建ConcurrentBag
      this.connectionBag = new ConcurrentBag<>(this);
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
   // 空闲线程 处理定时任务 
      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
   // 快速预检查
      checkFailFast();
   // metrics 收集相关
      if (config.getMetricsTrackerFactory() != null) {
         setMetricsTrackerFactory(config.getMetricsTrackerFactory());
      }
      else {
         setMetricRegistry(config.getMetricRegistry());
      }
   // 健康检查注册相关
      setHealthCheckRegistry(config.getHealthCheckRegistry());
   // 处理JMX监控相关
      handleMBeans(this, true);
      ThreadFactory threadFactory = config.getThreadFactory();
      // 配置中 连接池最大数
      final int maxPoolSize = config.getMaximumPoolSize();
      // 创建maxPoolSize 大小的LinkedBlockQueue 阻塞队列,用来构造
      LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);
      // 镜像只读队列
      this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);
      // 创建 添加连接的 线程池,实际线程数只有1
      this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
      // 创建 关闭连接的 线程池,实际线程数只有1
      this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
   // 空闲线程定时任务处理的 包装相关
      this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
   // 延时100ms后,开启任务,每30s执行空闲线程处理
      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
   // 如果配置了这两个属性,
      if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
         addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
         addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));

         final long startTime = currentTime();
         while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {
            quietlySleep(MILLISECONDS.toMillis(100));
         }

         addConnectionExecutor.setCorePoolSize(1);
         addConnectionExecutor.setMaximumPoolSize(1);
      }
   }

整个过程并不复杂,首先是对HikariPool的一些关键属性的初始化,如:

  • ConnectionBag

    这个是用保存管理池中连接的主要工具,也是HikariCP快的一个设计亮点

  • houseKeeperTask

    用来执行池中空闲线程的相关处理,每30s执行处理一次。

  • addConnectionExecutor

    用来给连接池中添加连接的线程池,单线程

  • closeConnectionExecutor

    用来移除线程池中的连接的线程池,单线程

除此之外,还有MetricRegistry, JMX相关属性的设置,所以结合Prometheus,jmx或者其他工具,也是可以实现图形化指标监控的(是不是感觉durid没那么香了)。

在这个过程中,还有个小细节,checkFailFast()


快速检测过程,会先尝试创建一个连接,如果说失败,就直接启动结束。

(所以,即使默认minimumIdle为10,在启动这一步也只是创建一个连接,剩下9个是在houseKeeperTask定时任务中处理的)

最后开启houseKeeperTask 定时任务。

至此,HikariCP 整个初始化,启动完成。


HouseKeeper: 补充&移除空闲连接

HouseKeeper这个定时任务主要是用来补充和移除连接池中的空闲连接,尽可能的保证连接池中的数量维持在minimumIdle数量。

第一次执行是在HikariPool初始化后的100秒,随后每30s执行一次。

   private final class HouseKeeper implements Runnable
   {
      private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);

      @Override
      public void run()
      {
         try {
            // refresh values in case they changed via MBean
            connectionTimeout = config.getConnectionTimeout();
            validationTimeout = config.getValidationTimeout();
            // 更新连接泄露处理的阈值
            leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
            catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;

            final long idleTimeout = config.getIdleTimeout();
            final long now = currentTime();

            // Detect retrograde time, allowing +128ms as per NTP spec.
            // 处理时钟倒退case,标记evict
            if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
               logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
                           poolName, elapsedDisplayString(previous, now));
               previous = now;
               softEvictConnections();
               return;
            }
            else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
               // No point evicting for forward clock motion, this merely accelerates connection retirement anyway
               logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
            }

            previous = now;

            String afterPrefix = "Pool ";
            // 回收符合条件的空闲连接
            if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
               logPoolState("Before cleanup ");
               afterPrefix = "After cleanup  ";

               final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
               int toRemove = notInUse.size() - config.getMinimumIdle();
               for (PoolEntry entry : notInUse) {
                  if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
                     closeConnection(entry, "(connection has passed idleTimeout)");
                     toRemove--;
                  }
               }
            }

            logPoolState(afterPrefix);
   // 填充连接
            fillPool(); // Try to maintain minimum connections
         }
         catch (Exception e) {
            logger.error("Unexpected exception in housekeeping task", e);
         }
      }
   }

在任务中,为了防止时钟回拨,给了128ms的gap,如果还不满足,就直接标记connectionBag 中所有连接为evict,  (不是直接close连接),本轮任务就算结束了。

直到当client使用连接,调用hikariPool的getConnection(final long hardTimeout)
时,对标记为evict连接做批量移除操作(细节,又是细节)。


那在正常情况下,ntp的校准回拨不会超过128ms,

这时,task在 fillPool 进行添加连接之前,先会清理一波空闲时间超过idleTimeout的空闲线程, 保证空闲线程数不超过配置的MinnumIdle。

注意清理连接的操作有个条件,这也是上篇强调需要注意配置的地方:

if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize())

HikariCP - 理解并正确使用配置

所以,如果配置的mininumIdle 和 maxinumPoolSize一致的话,就不用清理空闲线程咯~

最后再进行fillPoll(),往HikariPool中补充连接, 保证及时满足请求获取连接。

   private synchronized void fillPool()
   {
      final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
                                   - addConnectionQueueReadOnlyView.size();
      if (connectionsToAdd <= 0) logger.debug("{} - Fill pool skipped, pool is at sufficient level.", poolName);

      for (int i = 0; i < connectionsToAdd; i++) {
         addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
      }
   }

一次要添加多少连接呢?

Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) - addConnectionQueueReadOnlyView.size()

这取决于两个配置: 

MaximumPoolSize
MinimumIdle

以及pool中三个值:

TotalConnections
 

idleConnections

addConnectionQueueReadOnlyView.size()

简单解释一下,

addConnectionQueueReadOnlyView。

在初始化 HikariPool的时候,

addConnectionQueueReadOnlyView是addConnectionQueue的镜像。也就是说该size等于添加队列的size,即等待新建db连接的连接数。

最后扣除等待新建db连接的连接数,就是为了防止重复新增。

所以,如果idle空闲线程一直未满的话,fillPoll() 就一次加满至MinimumIdle的连接。

否则,添加 最大连接数 - 总连接数 数量的连接。

ConcurrentBag:真正的连接池

concurrentBag是Hikari pool 用来存储维护连接的一个类,  这才是pool最底层,最核心的类,或者说真正的连接池,也是快的一个重要因素,为什么这么说呢?

因为concurrentBag的方法中有连接池的四个重要操作:borrow获取连接,requite归还连接,add添加连接,remove移除连接。

一般来说,这四个操作为了保证线程安全和一致性,会同时加一把锁。而HikarCP建立了一套标记模型, 通过在获取连接时,cas标记连接状态为已使用,归还连接时,cas标记连接未使用,弱化了borrow 和 requite理应加的“重锁”。

在concurrentBag中,有四个关键属性

  • CopyOnWriteArrayList<T> sharedList

    保存pool中所有的连接

  • ThreadLocal<List<Object>> threadList

    线程中维护的归还连接

  • AtomicInteger waiters

    等待数据库连接的线程数

  • SynchronousQueue<T> handoffQueue

    连接阻塞队列

sharedList是用来真正保存连接的集合,使用了CopyOnWriteArrayList, 这个并发集合类特点是用空间换时间,提高了获取效率, 但可能存在读的不一致。但是因为配合了cas,所以解决了这个问题。

除了通过标记 + CopyOnWriteArrayList + cas 来避免了上锁,极大的优化了borrow 和 requite的效率。

concurrentBag 还有个ThreadLocal类型的 threadList,

再归还连接时,会在当前线程保存该连接,这样,下次同一线程 继续获取时,就直接可以从 threadList 取了,真的是秒!

另外,作者真的是恐怖,极其严谨,

在并发很大的情况下,threadList 和 shardList + cas 也兜不住了,怎么办?

handoffQueue!

handoffQueue的类型为SynchronousQueue,可以暂且理解是一个阻塞队列,也就是说,为了满足更高的并发,最后才会通过handoffQueue阻塞加锁方式来保证安全。

最后,再品味一下borrow的代码

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // 先从threadlocal中获取
   final List<Object> list = threadList.get();
   for (int i = list.size() - 1; i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      // cas标记状态为已使用
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   // 如果threadlocal中没有,从shardList中获取,等待连接数+1
   final int waiting = waiters.incrementAndGet();
   try {
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // 遍历sharedList,cas占取连接
            if (waiting > 1) {
               // 如果waiting-1 < 最大连接数,就添加
               listener.addBagItem(waiting - 1);
            }
            return bagEntry;
         }
      }
   // 如果shardList中也没获得连接,先添加等待连接,然后再从handoffQueue中获取。
      listener.addBagItem(waiting);
      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }

         timeout -= elapsedNanos(start);
      } while (timeout > 10_000);
      return null;
   }
   finally {
      waiters.decrementAndGet();
   }
}

代码基本同上述分析,不过又有个细节:关于waiters等待连接数的处理。

waiters的作用是来告诉线程池,现在还有waiters数量多的等待连接,你按要求去补充连接,最大maxPoolSize。

waiters 是个AtomicInteger 多线程共享的原子Integer类型,在threadList 本线程无可用连接时,该线程会自增一,在handoffQueue阻塞获取成功后,会补充waiters数量的连接。然后在线程执行完毕,finnally中,waiters会减一。

那正常情况,在扫描sharedList抢占连接成功时,本线程不用补充连接,但是并发情况下,为了保证能够及时补充连接,也是需要addBagItem的。

代码是这么处理的:

// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}

因为, 如果别的线程可以在pool中得到满足,即获取到可用连接时,那么当前线程看到的waiters值就是1,所以在addBagItem时,waiters - 1就好了。


HikariCP为了名副其实, 真的是细节满满,锱铢必较!除了最重要的ConcurrentBag,以及通过标记模型弱化获取和归还连接的锁处理外,还有很多其他细节上的处理。

如为了优化ArrayList 的rangeCheck开销,以及更符合数据库连接使用情况的场景(通常情况下,同一个Connection创建了多个Statement时,后打开的Statement会先关闭。ArrayList的remove(Object)方法是从头开始遍历数组,FastList是从数组的尾部开始遍历,避免remove操作时,从头遍历的开销),单独写了FastList数据结构。



文章转载自蚂蚁的宝藏,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论