暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

openGauss 线程池技术

原创 小小亮 2021-12-22
3047

openGauss在多线程架构的基础上实现了线程池。线程池机制实现了会话和处理线程分离,在大并发连接的情况下仍然能够保证系统有很好的SLA响应另外不同的线程组可绑到不同的NUMAnon-uniform memory access,非一致性内存访问)核上,天然匹配NUMA化的CPU架构,从而提升openGauss的整体性能


一、线程池原理

openGauss线程池机制原理如下图所示,图中的主要对象如下表所示

线程池机制原理

线程池对象

对象

说明

Postmaster

主线程。负责监听客户端发出的请求

ThreadPoolControler

线程池总控。负责线程池的初始化和资源管理

ThreadSessionControler

会话生命周期管理

ThreadPoolGroup

线程组。可以定义灵活的线程数量和绑核策略

ThreadPoolListener

线程组监听线程。负责事件的分发和管理

ThreadPoolWorker

工作线程

session

客户端连接的一个会话

NUMA NODE

NUMA节点表示一个线程组在NUMA结构下可以映射到一个NUMA节点上

 

这些对象相互配合实现了线程池机制,它们的主要交互过程如下。

1) 客户端向数据库发起连接请求,Postmaster线程接收到连接请求并被唤醒。Postmaster线程创建该连接对应的socket(套接字,用于描述IP地址和端口,是一个通信链的句柄),调用ThreadPoolControler函数创建会话session)数据结构。ThreadPoolControler函数遍历当前所有的Thread Group(线程组),找到当前活跃会话数量最少的Thread Group,并把最新的会话分发给该Thread Group,加入该Thread GroupepollepollLinux内核为处理大批量句柄而作了改进的poll(轮询),能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率)列表之中。
2Thread Grouplistener线程负责监听epoll列表中所有的客户连接。
3) 客户端发起任务请求,listener线程被唤醒。listener线程检查当前的Thread Group是否有空闲worker线程;如果有,则把当前会话分配给该worker线程,并唤醒该worker线程;如果没有,则把该会话放在等待队列之中。
4worker线程被唤醒后,读取客户端连接上的请求,执行相应请求,并返回请求结果。在一次事务结束(提交、回滚)或者事务超时退出的时候,worker线程的一次任务完成。worker线程将会话返回给listener线程,listener线程继续等待该会话的下一次请求。worker线程返还会话后,检查会话等待队列;如果存在等待响应请求的会话,则直接从该队列中取出新的会话并继续工作;如果没有等待响应的会话,则将自身标记为free(空闲)状态,等待listener线程唤醒。
5) 客户端断开连接时,worker线程被唤醒,关闭连接,同时清理会话相关结构,释放内存和fd(文件句柄)等资源。
6) 如果worker线程FATAL级别错误退出,退出时worker线程会从worker队列中注销掉。此时listener线程会重新启动一个新的worker线程,直到达到指定数量的worker线程。


二、线程池实现

线程池功能由GUC参数enable_thread_pool控制,该变量设置为true才能使用线程池功能。代码主要在“openGauss-server/src/gausskernel/process/threadpool目录中,下面介绍主要代码实现流程。

Postmaster线程在ServerLoop判断如果启用了线程池功能,则会调用“ThreadPoolControler::Init”函数进行线程池的初始化。在线程池初始化时,会判断NUMA节点的个数进行NUMA结构处理。相关代码如下

 if (threadPoolActivated) {
        bool enableNumaDistribute = (g_instance.shmem_cxt.numaNodeNum > 1);
        g_threadPoolControler->Init(enableNumaDistribute);
}


ThreadPoolControler::Init函数的主要作用是创建m_sessCtrl成员m_groups成员对象根据绑核策略分配线程个数,调用“ThreadPoolGroup::init”函数进行线程组的初始化,调用“ThreadPoolGroup::WaitReady”函数等待各个线程组初始化结束。创建m_scheduler成员对象并且调用ThreadPoolScheduler::StartUp”函数启动线程池调度线程。在“ThreadPoolGroup::init”函数中,创建m_listener对象启动listener线程。为ThreadWorkerSentry函数分配内存初始化每个worker的互斥量和条件变量。调用“ThreadPoolGroup::AddWorker”函数创建worker对象启动worker线程

Postmaster线程在ServerLoop如果监听到有客户端链接请求,判断启用了线程池功能,则会调用“ThreadPoolControler::DispatchSession”函数进行会话分发。相关代码如下

if (threadPoolActivated &&!(i < MAXLISTEN && t_thrd.postmaster_cxt.listen_sock_type[i] == HA_LISTEN_SOCKET))
result = g_threadPoolControler->DispatchSession(port);
/*  ThreadPoolControler::DispatchSession的代码实现如下,找到一个会话数最少的线程组,创建会话,把会话添加到线程组的监听线程中  */ 
int ThreadPoolControler::DispatchSession(Port* port)
{
    ThreadPoolGroup* grp = NULL;
    knl_session_context* sc = NULL;
    grp = FindThreadGroupWithLeastSession();
    if (grp == NULL) {
        Assert(false);
        return STATUS_ERROR;
    }
    sc = m_sessCtrl->CreateSession(port);
    if (sc == NULL)
        return STATUS_ERROR;
    grp->GetListener()->AddNewSession(sc);
    return STATUS_OK;
}


listener线程的主函数为TpoolListenerMain(ThreadPoolListener* listener)”。函数中设置线程的名字和信号处理函数创建epoll等待事件通知Postmaster线程已经准备好,调用t_pool_listener_loop函数(其实是调用ThreadPoolListener::WaitTask”函数进入等待事件状态)。如果有事件到来,调用“ThreadPoolListener::HandleConnEvent”函数找到事件对应的会话调用ThreadPoolListener::DispatchSession”函数,如果有空闲的worker线程,通知worker线程进行处理如果没有空闲的worker线程,则把会话挂到等待队列中

worker线程的主函数就是正常的SQL处理函数PostgresMain,与非线程模式相比,主要多了3处处理:

1worker线程准备就绪的通知
2等待会话通知
3) 连接退出处理

worker线程的相关代码如下

 if (IS_THREAD_POOL_WORKER) {
        u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET;
        t_thrd.threadpool_cxt.worker->NotifyReady();
}
       if (IS_THREAD_POOL_WORKER) {
            t_thrd.threadpool_cxt.worker->WaitMission();
            Assert(u_sess->status != KNL_SESS_FAKE);
        }
    case 'X':
    case EOF:
                RemoveTempNamespace();
                InitThreadLocalWhenSessionExit();
                if (IS_THREAD_POOL_WORKER) {
                    t_thrd.threadpool_cxt.worker->CleanUpSession(false);
                    break;
                }


ThreadPoolWorker::WaitMission”函数的主要作用是阻塞所有系统信号,避免系统信号比如SIGHUP等中断当前的处理清除线程上的会话信息保证没有上一个会话的内容等待会话上新的请求,会话给线程进行处理,允许系统信号中断。

ThreadPoolWorker::CleanUpSession”函数的主要作用是清除会话,从Listener中去除会话,释放会话资源。

上面介绍了线程池的主要机制,综上所述,线程池主要是解决大并发的用户连接,在一定程度上可以起到流量控制的作用,即使用户的连接数很多,后端也不需要分配太多的线程。线程是OS的一种资源如果线程太多OS资源占用很多并且大量线程的调度和切换会带来昂贵的开销如果没有线程池随着连接数的增多系统的吞吐量会逐渐降低另外一方面把线程池划分为线程组可以很好匹配NUMA CPU架构的节点,提升多核情况下的访问性能。每个线程组一个监听者,避免了线程池的“惊群效应”。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论