暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

PostgreSQL 并行框架分析

1481

作者简介 

施博文,目前就职于腾讯云 PG 团队

概览
PostgreSQL 并行框架提供了一系列方便的函数,支持在插件或内核中直接调用相关函数,启动若干个后台进程进行并行操作。目前,PG 的并行框架主要用来支持并行查询。本文将介绍并行框架的相关功能,分析其核心逻辑和函数。
可能需要了解的前置知识:
shm_mq 消息队列:PG 内核提供的单生产者单消费者消息队列
动态共享内存:简称 dsm, 在数据库运行期间可以动态创建出来的共享内存
为什么需要并行框架
设想这样一个场景:我们运行了一条 SQL,运行这条 SQL 需要再拉起一个 background worker 进行并行查询。为了方便描述,我们将运行这条 SQL 的进程称为主进程,拉起的 background worker 称为从进程。
但是,从进程可能干活的时候出现了 ERROR ,甚至 CRASH。这时候我们需要通知主进程自己遇到了 ERROR,让主进程进行异常处理。
事实上,我们确实可以不依赖于并行框架实现这样的功能,但是这就需要我们自己去完成类似的异常处理机制。在这种情况下,直接使用并行框架会是一个更好的选择。
除此之外,并行框架还提供了其他的一些功能。简要的概括一下:
错误消息处理机制:在动态共享内存中存放一个消息队列,如果从进程出现异常,将异常信息放入消息队列中并发送信号,主进程在 CHECK_FOR_INTERRUPTS 时会检测到错误,进行异常处理;
序列化状态同步:主进程将一些内容序列化表示(如 GUC、快照等),通过动态共享内存传递给从进程;
用户自定义内容传递:用户可以将一些自定义的数据放入动态共享内存;
安全防护机制:并行框架严格限制只有 只读类型 SQL 才能使用。如果只是使用 SQL 的话,并行框架将是安全的;但是对于内核开发者而言,在其他代码中使用并行框架就需要遵守并行框架的限制。

如何使用并行框架

根据 PG 代码中 README.parallel
 介绍,使用并行框架需要遵循如下规范:
    /* 进入并行模式,防止不安全调用。例如在使用并行框架的时候出现写操作 */
    EnterParallelMode();

    /* 初始化 ParallelContext:存放本次并行计算的基本信息 */
    pcxt = CreateParallelContext("library_name", "function_name", nworkers);

    /* 给用户自定义内容预分配空间 */
    shm_toc_estimate_chunk(&pcxt->estimator, size);
    shm_toc_estimate_keys(&pcxt->estimator, keys);

    /* 创建动态共享内存,并将 GUC、Snapshot 这类的信息序列化后拷进去 */
    InitializeParallelDSM(pcxt);

    /* 将用户自定义内容插入动态共享内存中 */
    space = shm_toc_allocate(pcxt->toc, size);
    shm_toc_insert(pcxt->toc, key, space);

    /* 启动 background worker 进程(从进程)*/
    LaunchParallelWorkers(pcxt);

    /* do parallel stuff */

    /* 等待所有从进程退出 */
    WaitForParallelWorkersToFinish(pcxt);

    /* read any final results from dynamic shared memory */

    /* 清理 ParallelContext */
    DestroyParallelContext(pcxt);

    /* 退出并行模式 */
    ExitParallelMode();
    原理分析
    主进程——拉起 background worker
    主进程在启动从进程时,会调用 LaunchParallelWorkers 函数,这个函数会调用 RegisterDynamicBackgroundWorker ,此时会给 Postmaster 进程发一个信号,请求 Postmaster 进程启动一个 background 进程,Postmaster 进程收到信号后就会启动一个新后台进程。
    接下来我们仔细分析一下 LaunchParallelWorkers
     函数的核心代码。
      void
      LaunchParallelWorkers(ParallelContext *pcxt)
      {
      * 并行框架要求在不能启动从进程的情况下,也能正常运行 */
      if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
      return;

      *
      * 主进程需要成为 Lock Group Leader
      *
      * PG 认为:对于一个并行查询,主进程和它的从进程们为一个进程组,同一个进程组内部锁是共享的。
      * 即一个进程拿到 AccessExclusiveLock 的时候,同一进程组的另一个进程能拿到 AccessShareLock 锁。
      * 如果不这么做的话,会出现死锁。举个例子,主进程已经拿了一个 AccessExclusiveLock 锁,
      * 这时候从进程需要拿 AccessShareLock 才能完成工作。
      * 此时从进程无法拿到锁,没法结束工作;主进程因为从进程没有结束,不能释放锁,于是就 hang 住了。
      * 因此,并行框架引入了 lock group 的概念。同一个 lock group 中的进程不受锁排他性的影响。
      * 具体可参考 src/backend/storage/lmgr/README ,我会在后续的博客中介绍。
      */
      BecomeLockGroupLeader();

      * 如果需要启动 worker 的话,必须已经注册动态共享内存了*/
      Assert(pcxt->seg != NULL);

      * We might be running in a short-lived memory context. */
      oldcontext = MemoryContextSwitchTo(TopTransactionContext);

      * worker 信息初始化 */
      memset(&worker, 0, sizeof(worker));
      snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
      MyProcPid);
      snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
      worker.bgw_flags =
      BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
      | BGWORKER_CLASS_PARALLEL;
      worker.bgw_start_time = BgWorkerStart_ConsistentState;
      worker.bgw_restart_time = BGW_NEVER_RESTART;
      sprintf(worker.bgw_library_name, "postgres");
      sprintf(worker.bgw_function_name, "ParallelWorkerMain");
      worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
      worker.bgw_notify_pid = MyProcPid;

      *
      * 启动进程,并行框架要求在不能启动从进程的情况下,也能正常运行。
      */
      for (i = 0; i < pcxt->nworkers_to_launch; ++i)
      {
      memcpy(worker.bgw_extra, &i, sizeof(int));
      if (!any_registrations_failed &&
      RegisterDynamicBackgroundWorker(&worker,
      &pcxt->worker[i].bgwhandle))
      {
      shm_mq_set_handle(pcxt->worker[i].error_mqh,
      pcxt->worker[i].bgwhandle);
      pcxt->nworkers_launched++;
      }
      else
      {
      *
      * 即使少启动了 worker ,也能正常运行。但是需要先 detach 该 worker 的错误消息队列,否则后续我们会一直等这个 worker 启动(hang 住)。
      */
      any_registrations_failed = true;
      pcxt->worker[i].bgwhandle = NULL;
      shm_mq_detach(pcxt->worker[i].error_mqh);
      pcxt->worker[i].error_mqh = NULL;
      }
      }

      *
      * Now that nworkers_launched has taken its final value, we can initialize
      * known_attached_workers.
      */
      if (pcxt->nworkers_launched > 0)
      {
      pcxt->known_attached_workers =
      palloc0(sizeof(bool) * pcxt->nworkers_launched);
      pcxt->nknown_attached_workers = 0;
      }


      /* Restore previous memory context. */
      MemoryContextSwitchTo(oldcontext);
      }

      从进程入口——ParallelWorkerMain
      ParallelWorkerMain 可以理解为从进程的入口函数,其函数调用栈为
      PostmasterMain->ServerLoop->maybe_start_bgworkers->StartBackgroundWorker->ParallelWorkerMain 。而用户定义的从进程 main 函数,将在 ParallelWorkerMain 中被调用。
      该函数主要就是从动态共享内存中读取一些已经序列化的主进程信息(GUC、Snapshot),成为 Lock Group Member (对应上文主进程是 Leader),然后连进数据库,通过消息队列发消息告诉主进程自己已经成功启动。

      异常处理机制
      回到一开始的问题,如果主进程拉起 n 个 background worker 进程进行并行查询。这时候如果其中的一个出现了错误,应该如何通知主进程呢?注意,这 n 个从进程可能不是一次拉起的,可能第一次拉几个,第二次再拉几个,对应多个 ParallelContext (下文简称 pcxt)。
      在上文中提到,动态共享内存中存放了一个消息队列,如果从进程出现异常,将异常信息放入消息队列中并发送信号,主进程在 CHECK_FOR_INTERRUPTS 时会检测到错误,进行异常处理。
      事实上,主进程会维护一个 pcxt_list,对于每一个从进程,将每一个从进程的信息存放在 pcxt_list 中。在调用 CHECK_FOR_INTERRUPTS 时,其内部会调用 HandleParallelMessages 函数。如果收到了从进程发来的信号,会遍历 pcxt_list ,如对于每一个 ParallelContext ,都会检查这个 ParallelContext 启动的所有 background worker 对应的错误消息队列,如果有异常信息就从消息队列中读取信息。


      总结

      本文介绍了 PG 中并行框架的相关概念和设计原理,对于其中的一些细节:如 Lock Group、动态共享内存、shm_mq 消息队列并没有做具体的介绍。这些将在后续的博客中进行详细的分析。




      PostgreSQL中文社区欢迎广大技术人员投稿
      投稿邮箱:press@postgres.cn


      文章转载自PostgreSQL中文社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论