暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PolarDB分布式版(PolarDB-X) 私有协议2.0(2)


前文:PolarDB分布式版(PolarDB-X) 私有协议2.0(1)

主路径设计

事件驱动框架

事件驱动框架为线程安全的多线程 epoll 模型,代码主要在epoll.h文件中。 epoll loop 处理逻辑和大多数事件驱动的异步框架类似,为一个大循环。作为一个多线程模型,为提升 epoll 中的等待唤醒性能(避免多线程调用 epoll_wait 中的锁),同时保证多线程事件驱动模型的任务本地执行通用性,默认使用4线程作为基础 epoll 线程,在 epoll 上等待,处理网络事件,其他新增 worker 线程可以在 eventfd 上等待,eventfd 用于在任务队列中新增任务时唤醒线程,eventfd 同时也会注册到 epoll 上作为唤醒条件之一。 和一般的异步事件框架不同的是,由线程池同时承担数据库中的请求执行,不能使用传统的任务队列模型,因为存在数据库请求间的依赖关系(事务之间的锁等待),需要具备动态增加线程,以应对大于线程池数的 wait 以打破事务 wait 的特殊情况,同时这些线程可以优雅退出完成线程池收缩。

针对多线程事件驱动框架,还设计了以下特性以提升性能:

  • timer 使用小顶堆,一次性消费 timer 设计,重复 timer 可以重复插入,不可删除(可以回调中支持逻辑删除)
  • timer 和 work queue 使用 lock free array queue,timer 获取小顶堆的最小超时时间采用 try lock,只有一个线程去处理 timer(都是轻量级任务)
  • eventfd 唤醒后,优先重置,在大量任务堆积时尽可能唤醒更多线程
  • 统计 wait 状态计数,减少不必要的 eventfd notify 流程


自适应绑核

绑核作为处理 CPU 密集型任务的通用优化手段,XRPC 中也加入了自动的自适应绑核策略:

  • 获取当前可运行的 processor 集合(适配全局的核限制,例如 k8s 的调度绑核)
  • 根据可运行的核,按 physical id,core id,processor 排序 - 根据配置的 mt epoll 线程数,按序分配
  • epoll 主线程严格绑到一个核心上(也可配置为绑到 group 内所有核上),互不重叠,提供最优简单 TP 类请求处理和调度性能,动态扩容的线程绑定到 group 内的所有核上,提供 AP 类请求重载下更好的 CPU 负载均衡
  • 该策略适应包括 numa 在内的大部分情况,结合上述设计可以将 tcp 连接、session 和执行上下文按 epoll group 分配绑定到不同 CPU 核心及 numa 节点上

TCP context 生命周期控制

在多线程事件驱动框架下,可销毁对象的生命周期控制是个比较麻烦的事情,目前大部分资源以 TCP 连接为单位进行管理,即 TCP context 为基本的生命周期管理单元。由于多线程 epoll,可能存在摘除 fd 后,其他线程因为线程交换出去仍可以看到 TCP context 情况,造成 dangling ptr 情况,这时就需要一些手段来对 TCP context 进行保护。考虑到 EBR(epoch based reclamation) 代码复杂度,这里采用了 ref cnt + 延迟 reclaim 方式对 TCP context 进行保护,延迟采用 timer 调度(超时为2倍最大 epoll 超时时间),epoll 触发后会先通过 pre_event 加 ref,避免网络包请求处理时间过长而导致 context 被提前释放。

TCP context 无锁收包设计

为了避免惊群问题,epoll 中对 socket 的触发采用 edge trigger 模式,即收到包后仅仅会唤醒一个线程进行处理,但由于 TCP 包处理以及多线程 epoll 框架的特性,可能多个包才能组成一个完整的请求,而这多个 edge trigger 可能会唤醒不同的线程处理,则需要一个机制去保证只有一个线程去处理同一个 TCP 下的同一个请求的多个网络包,这里我们采用 spin lock try_lock 抢占保证只有一个线程收包,使用 recheck flag+retry 实现第一个收包线程继续收包,避免处理过程中新到达的包漏包,完整流程参考下面的伪代码。采用无锁无等待的设计保证不会浪费任何一个线程资源让其处于锁等待状态(可以立刻去处理 epoll 上的其他 socket 消息)。

/// 伪代码
do {
  if (UNLIKELY(!read_lock_.try_lock())) {
    recheck_.store(true, std::memory_order_release);
    std::atomic_thread_fence(std::memory_order_seq_cst);
    if (LIKELY(!read_lock_.try_lock()))
      break; /// do any possible notify task
  }

  do {
    /// clear flag before read
    recheck_.store(false, std::memory_order_relaxed);

    RECV_ROUTINE;

    if (RECV_SUCCESS) {
      recheck_.store(true, std::memory_order_relaxed);

      DEAL_PACKET_ROUTINE;
    }

  } while (recheck_.load(std::memory_order_acquire));

  read_lock_.unlock();
  std::atomic_thread_fence(std::memory_order_seq_cst);
} while (UNLIKELY(recheck_.load(std::memory_order_acquire)));

TCP context 本地执行设计

前文也提到,设计的这个多线程事件驱动框架,需要在不同任务负载下,以最优的方式进行执行调度。因此需要针对数据库请求做一系列针对性处理策略和优化,主要包含以下几点:

  • 在 recv 线程上下文做解包,充分利用 cache
  • 复用 recv buffer 连续进行解包,较少 malloc 和 memcpy/memmove 代价
  • 大包自动动态扩容 recv buffer 到 dynamic buffer,10s内无大包,完成接收后收缩 buffer,节省内存占用
  • 解包完成后,push 到 session 指令队列时,记录需要 notify 的 session(push前队列为空),尽可能较少 event notify 调用
  • 完成全部消息处理和 push 指令队列后,根据 event 目前处理情况,如果是最后 event 且是 notify set 的最后一个 session,直接当前上下文开始请求处理(本地线程执行,最大化利用 cache),其他的都推到框架的任务队列中处理,代码逻辑如下所示
/// dealing notify or direct run outside the read lock.
if (!notify_set.empty()) {
  /// last one in event queue and this last one in notify set,
  /// can run directly
  auto cnt = notify_set.size();
  for (const auto &sid : notify_set) {
    if (0 == --cnt && index + 1 == total) {
      /// last one in set and run directly
      DBG_LOG(("tcp_conn run session %lu directly", sid));
      auto s = sessions_.get_session(sid);
      if (s)
        s->run();
    } else {
      /// schedule in work task
      DBG_LOG(("tcp_conn schedule task session %lu", sid));
      epoll_.push_work((new CdelayedTask(sid, this))->gen_task());
    }
  }
}
  • 当前上下文执行和任务队列执行比例可以通过 epoll_wait 的 event 数量控制(高压力情况下,events 基本上都是满的),执行比例满足 slos_cnt-1 : 1 的比例,slots_cnt 越大,任务队列执行比例越大,如下图,epoll_events_per_thread(slos_cnt) = 4,基本上是3:1比例


TCP context 发包设计

考虑到有流程同时包正常情况下不会很大,采用阻塞模型(大部分查询结果集 TCP sndbuf 就能 hold 住,大结果集的情况,会有外置流控和缓冲机制,保证即使阻塞也不会有太大影响)。同时通过外置 mutext 避免跨 session 串包,确保 session 解耦后的数据正确性。同时每个 session 内置 encoder 自带 buffer 池,自己满了或需要 flush 时候再拿 tcp 的锁 send,保证 encoder 性能的同时减少锁 TCP 通道的时间。如下代码展示了 send 的各种报错处理。

inline int wait_send() {
  auto timeout = net_write_timeout;
  if (UNLIKELY(timeout > MAX_NET_WRITE_TIMEOUT))
    timeout = MAX_NET_WRITE_TIMEOUT;
  ::pollfd pfd{fd_, POLLOUT | POLLERR, 0};
  return ::poll(&pfd, 1, static_cast<int>(timeout));
}

/// blocking send
bool send(const void *data, size_t length) final {
  if (UNLIKELY(fd_ < 0))
    return false;
  auto ptr = reinterpret_cast<const uint8_t *>(data);
  auto retry = 0;
  while (length > 0) {
    auto iret = ::send(fd_, ptr, length, 0);
    if (UNLIKELY(iret <= 0)) {
      auto err = errno;
      if (LIKELY(EAGAIN == err || EWOULDBLOCK == err)) {
        /// need wait
        auto wait = wait_send();
        if (UNLIKELY(wait <= 0)) {
          if (wait < 0)
            tcp_warn(errno, "send poll error");
          else
            tcp_warn(0, "send net write timeout");
          fin();
          return false;
        }
        /// wait done and retry
      } else if (EINTR == err) {
        if (++retry >= 10) {
          tcp_warn(EINTR, "send error with EINTR after retry 10");
          fin();
          return false;
        }
        /// simply retry
      } else {
        /// fatal error
        tcp_err(err, "send error");
        fin();
        return false;
      }
    } else {
      retry = 0; /// clear retry
      ptr += iret;
      length -= iret;
    }
  }
  return true;
}

session 设计

MySQL 中,提供了一个供外部使用的 session 对象,即 MYSQL_SESSION,XRPC 中的会话即为对MYSQL_SESSION包装。除此之外,XRPC 还做了以下优化,以适配其和计算节点通信:

  • 自己实现结果集 encoder,send buffer
  • 自带指令队列用于流水线请求
  • 类似 TCP context 的无锁单线程执行机制,实现 session 内单线程顺序执行,快速释放其他线程资源到其他请求上
  • 优化 MYSQL_SESSION 的 valid 机制,干掉全局 session 锁
  • 优化 srv_session 中 thread local 里面 THD 等生命周期控制,消除 dangling ptr 问题

encoder 重构

结果集 encoder 参考了最新 MySQL X plugin 的设计思路,重构同时做了以下优化:

  • 使用 protobuf-lite,减少了 binary 体积
  • 完全脱离 MySQL X plugin 依赖,砍掉了很多用不到的过度抽象设计
  • 基于 protobuf 消息,底层 api 直接在 buffer 上生成消息,primitives 采用参数模板的直接对应 msg 生成 hardcode 的编码
  • 大小端机器下之间指针强转编码,优化 int16,int32,int64 编码效率,如下所示
template <> struct Fixint_length<8> {
  template <uint64_t value> static void encode(uint8_t *&out) { // NOLINT
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)
    *reinterpret_cast<uint64_t *>(out) = __builtin_bswap64(value);
    out += 8;
#else
    *reinterpret_cast<uint64_t *>(out) = value;
    out += 8;
#endif
  }

  static void encode_value(uint8_t *&out, const uint64_t value) { // NOLINT
#if defined(__BYTE_ORDER__) && (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)
    *reinterpret_cast<uint64_t *>(out) = __builtin_bswap64(value);
    out += 8;
#else
    *reinterpret_cast<uint64_t *>(out) = value;
    out += 8;
#endif
  }
};
}

XPLAN 及 chunk encoder 重构

针对私有协议的2项重要功能: - 执行计划传输执行 - 列式数据传输

XRPC 对其进行了移植和部分编码精简优化,提高了其兼容性和修复了一些潜藏已久的bug。

可调参数&内部状态可观测设计

为了在不同平台、不同规格、不同负载下达到最优的性能,XRPC 提供了大量的可调参数:

变量名默认说明
polarx_rpc_enable_perf_hist[ON|OFF]OFF是否开启XRPC性能统计直方图(性能调优时候用)
polarx_rpc_enable_tasker[ON|OFF]ON是否允许扩展线程池
polarx_rpc_enable_thread_pool_log[ON|OFF]ON是否打开线程池log
polarx_rpc_epoll_events_per_thread[1-16]4每个epoll线程处理的epoll事件数
polarx_rpc_epoll_extra_groups[0-32]0额外的epoll线程池组数,一般不配置
polarx_rpc_epoll_group_ctx_refresh_time[1000-60000]10000每个epoll线程池组共享session刷新时间,用于释放超时的session,单位ms,默认10s
polarx_rpc_epoll_group_dynamic_threads[0-16]0每个epoll线程池组中期待的非基础(动态扩展)线程数,一般设置为0
polarx_rpc_epoll_group_dynamic_threads_shrink_time[1000-600000]10000epoll线程池组中非基础(动态扩展)线程收缩的延迟时间,用于在高并发负载下来后,扩展了的线程持续存活时间,单位ms,默认为10s
polarx_rpc_epoll_group_tasker_extend_step[1-50]2在并发上来后,基于排队任务扩展线程池,线程池扩展的步长(一次扩展多少线程)
polarx_rpc_epoll_group_tasker_multiply[1-50]3在并发上来后,基于排队任务扩展线程池,线程池扩展的阈值因数,即当 排队任务>该因子*工作线程数 时,线程池会扩展
polarx_rpc_epoll_group_thread_deadlock_check_interval[1-10000]500检查因为内部事务或者其他外部等待依赖导致死锁的检测时间,单位ms,默认500ms
polarx_rpc_epoll_group_thread_scale_thresh[0-100]2基于线程等待原因分析的线程池扩容机制,该参数用于指定至少有多少线程等待后才去扩容,实际最大允许为线程池中基础线程数-1,最小为0,默认给2
polarx_rpc_epoll_groups[0-128]0默认epoll组个数,以为epoll存在大锁,还是多组打散,默认是0,自动根据核数和每组的基础线程数计算
polarx_rpc_epoll_threads_per_group[1-128]4每个epoll组的线程数,越小锁冲突越小,但越难发挥线程池自动调度能力,默认4
polarx_rpc_epoll_timeout[1-60000]10000每次调用epoll的超时时间,单位ms,默认10s
polarx_rpc_epoll_work_queue_capacity[128-4096]256每个epoll组的任务队列深度
polarx_rpc_force_all_cores[ON|OFF]OFF是否突破执行核限制绑到所有CPU核上,默认不允许
polarx_rpc_galaxy_protocol[ON|OFF]OFF是否开启galaxy protocol协议,默认不开
polarx_rpc_galaxy_version[0-127]0galaxy protocol协议版本
polarx_rpc_max_allowed_packet[4096-1073741824]67108864XRPC的最大包限制,默认64MB
polarx_rpc_max_cached_output_buffer_pages[1-256]10每个session的默认输出缓冲大小,单位是页,每个页4K,默认10个
polarx_rpc_max_epoll_wait_total_threads[0-128]0最多允许等待epoll的线程数,默认是0,自动计算的,为 epoll组数*每个epoll的基础线程数
polarx_rpc_max_queued_messages[16-4096]128每个session允许的最大排队流水线请求深度
polarx_rpc_mcs_spin_cnt[1-10000]2000内部用到的mcs自旋锁自旋次数,默认2000,超过后yield
polarx_rpc_min_auto_epoll_groups[1-128]5.7 16 8.0 32自动计算的最少的epoll组数
polarx_rpc_multi_affinity_in_group[ON|OFF]OFF 公共云通过参数默认打开是否允许epoll组内线程绑定到多个核上,开启后tpch多个大任务倾斜长尾现象会缓解
polarx_rpc_net_write_timeout[1-7200000]10000网咯写超时时间,单位ms,默认10s
polarx_rpc_request_cache_instances[1-128]16Sql/Xplan cache的分组数,减少锁冲突,默认16
polarx_rpc_request_cache_max_length[128-1073741824]1048576允许缓存到cache的请求大小,单位字节,默认值缓存小于1MB的sql
polarx_rpc_request_cache_number[128-16384]1024Sql/Xplan cache缓存slot数,sql和xplan是单独的空间,每个都有默认1024个slot
polarx_rpc_session_poll_rwlock_spin_cnt[1-10000]1RW自旋锁自旋数,默认1,超过后yield
polarx_rpc_shared_session_lifetime[1000-3600000]60000每个epoll组*享session的最长生存时间
polarx_rpc_tcp_fixed_dealing_buf[4096-65536]4096每个tcp的解析缓冲大小,单位字节,默认4K
polarx_rpc_tcp_keep_alive[1-7200]30tcp的keep alive参数,单位s,默认30s
polarx_rpc_tcp_listen_queue[128-4096]128tcp accept队列深度,默认128
polarx_rpc_tcp_recv_buf[0-2097152]0tcp recv buffer,默认0用系统默认值
polarx_rpc_tcp_send_buf[0-2097152]0tcp send buffer,默认0用系统默认值
rpc_port[0-65536]33660XRPC端口号
rpc_use_legacy_port[ON|OFF]ON是否兼容模式使用polarx_port的值作为端口号

同时为了确保运行时观察工作状态,XRPC开放了一部分全局变量用于观察内部线程数和会话数量:

全局状态变量说明样例
polarx_rpc_initedXRPC是否启动成功ON
polarx_rpc_plan_evictxplan cache LRU中淘汰数123
polarx_rpc_plan_hitxplan cache LRU中命中数4234244
polarx_rpc_plan_missxplan cache LRU中未命中数42424
polarx_rpc_sql_evictsql cache LRU中淘汰数123
polarx_rpc_sql_hitsql cache LRU中命中数4234244
polarx_rpc_sql_misssql cache LRU中未命中数42424
polarx_rpc_tcp_closing正在关闭的TCP数0
polarx_rpc_tcp_connections当前TCP数32
polarx_rpc_threadsXRPC中的总线程数64
polarx_rpc_total_sessionsXRPC中的总session数(包含共享session)38
polarx_rpc_worker_sessionsXRPC中的工作session数(CN的后端session)32

由于内部调度的复杂性,XRPC也自带了内部高精度时钟统计各阶段的耗时直方图,便于定位性能问题和调优。

mysql> show variables like '%perf_hist%';
+-----------------------------+-------+
| Variable_name               | Value |
+-----------------------------+-------+
| polarx_rpc_enable_perf_hist | OFF   |
+-----------------------------+-------+
1 row in set (0.00 sec)

mysql> set global polarx_rpc_enable_perf_hist = 'ON';
Query OK, 0 rows affected (0.01 sec)

mysql> show variables like '%perf_hist%';
+-----------------------------+-------+
| Variable_name               | Value |
+-----------------------------+-------+
| polarx_rpc_enable_perf_hist | ON    |
+-----------------------------+-------+
1 row in set (0.00 sec)

mysql> call xrpc.perf_hist('all')\G

上述命令会开启运行时的各网络、调度、执行阶段的耗时直方图,主要有:

  • work queue,工作队列获取任务的耗时
  • recv first,收第一个网络包并处理的耗时
  • recv all,收到一个完整请求网络包并处理解码的耗时
  • schedule,一个请求从接收到调度开始执行的延迟
  • run,一个请求在 mysql 中的执行耗时

数据样例如图所示,采用指数分段直方图,有利于分析各种响应分布及长尾等情况。


通过在调用存储过程中指定不同的统计项,可以显示单独的直方图,all 会显示全部5项直方图。call xrpc.perf_hist('reset');可以重置直方图,便于在压测稳定后,观察稳态的耗时分布。

其他优化

XRPC 在开发过程中,也借鉴了不同高性能数据结构的实现,力求在网络、调度部分提供最优的性能体验:

  • 参考 rust crossbeam 中的 backoff 实现了指数级退让机制
  • 参考 mcs spin lock 等 spin lock 思路,优化内部 spin lock 和 RW spin lock
  • 参考 rust crossbeam 中的 array queue 实现无锁任务队列
  • 大量的 likely 和 unlikely 分支预测优化 - 大量的无锁算法实现


未完待续

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论