暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

go.uber.org/automaxprocs 源码分析

        我们知道在GMP模型中P的数量决定了并行运行的goroutine数量,runtime.GOMAXPROCS 在 Go 1.5 版本后的默认值是机器的 CPU 核数 (runtime.NumCPU),在runtime 包里有两个函数可以方便使用

    runtime.NumCPU() // 获取机器的CPU核心数
    runtime.GOMAXPROCS(0// 参数为零时用于获取给GOMAXPROCS设置的值,大于0的时候设置值

    但是在容器环境中,不同容器采用cgroup技术做cpu资源的隔离,runtime.NumCPU()获取的是宿主机的cpu数量。如果采用默认的runtime.GO

    MAXPROCS,在容器数量比较多的情况下会导致P的数量过多,导致go调度

    器不断切换线程,使得性能下降。如何解决呢?

           思路就是根据cgroup限制的容器cpu使用份额,来设置P的数量,如何获取给容器的分配的cpu资源呢?答案是读取proc文件系统里面的信息,获取cpu配额,然后通过总cpu个数和配额的确定真实可用的cpu数量,这就是go.uber.org/automaxprocs这个库干的事情。

            在分析源码之前,我们先学习下docker的基础知识:容器分配cpu配额有三种策略:Default(其实就是 CFS),Static (cpuset,和具体某几个核心绑定),Nolimit。使用 Default 的服务占据绝大多数,是通过cfs.cpu_period_us cfs.cpu_quota_us 两个参数来控制容器cgroup下cpu份额的。cfs.cpu_period_us 文件记录了调度周期,单位是 us;默认值一般是 100'000,即 100 ms;cfs.cpu_quota_us 记录了每个调度周期进程允许使用 cpu 的量,单位也是 us,值为 -1 表示无限制,对于 4C 的容器,这个值一般是 400'000。那么

      cfs.cpu_quota_us/cfs.cpu_period_us

      就是每个容器真实可用的cpu配额,即cpu的核心数量。这个公式也是这个库的核心原理。

              上面两个参数怎么获取呢,我们先起一个容器看一下

        docker run -it --cpu-period 100000 --cpu-quota 200000 debian:latest bin/bash

        其中参数--cpu-period  和--cpu-quota  就是指定上述两个参数的。在容器内我们通过/proc文件系统可以看到具体的值

          /# cat sys/fs/cgroup/cpu/cpu.cfs_period_us
          100000
          /# cat sys/fs/cgroup/cpu/cpu.cfs_quota_us
          200000

          上述两个文件的路径怎么获取呢?不同的平台、不同版本实现方式不一样,我们可以通过

            /proc/${pid}/cgroup  
            /proc/${pid}/mountinfo

            两个文件获取

              /# cat proc/self/mountinfo  |grep cpu
              587 581 0:33 docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f sys/fs/cgroup/cpu,cpuacct ro,nosuid,nodev,noexec,relatime master:10 - cgroup cgroup rw,cpu,cpuacct
              593 581 0:39 /docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f /sys/fs/cgroup/cpuset ro,nosuid,nodev,noexec,relatime master:16 - cgroup cgroup rw,cpuset
                /# cat /proc/self/cgroup
                12:cpuset:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                11:devices:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                10:perf_event:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                9:hugetlb:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                8:rdma:/
                7:freezer:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                6:cpu,cpuacct:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                5:pids:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                4:net_cls,net_prio:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                3:memory:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                2:blkio:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f
                1:name=systemd:/docker/ba2b576a2e587dd7b4f572827ac885c7ec65c6819841adeefc84838266b0667f

                可以看到/proc/${pid}/cgroup 文件每行以冒号分割,共列,他们的含义分别是:

                • 1,cgroup 树的 ID, 和 /proc/cgroups 文件中的 ID 一一对应。

                • 2,和 cgroup 树绑定的所有 subsystem,多个 subsystem 之间用逗号隔开。name=systemd 表示没有和任何 subsystem 绑定

                • 3,进程在 cgroup 树中的路径,即进程所属的 cgroup,这个路径是相对于挂载点的相对路径。

                路径是我们重点关注的,可以看到cpu是直接放在挂载点下面的。

                /proc/${pid}/mountinfo每条记录的字段用空格分隔,字段 - 表示后面都是 options

                共有三个字段需要我们关心:

                • 1,第四列:组成当前挂载点根路径的文件系统的路径

                • 2,第五列:当前挂载点相对于进程根目录的路径

                • 3,- 字段之后的第一个字段,代表 filesystem type

                可以看到我们的路径是/sys/fs/cgroup/cpu 对应文件系统是cgroup

                结合上述两个信息就可以找到我们的cpu配额文件的路径。不过需要注意的是,上面看到的信息我是在linux机器看到的。对于docker for mac 来说,它是运行在xhyve虚拟机上的,对应的路径是不一样的,并且

                  /proc/${pid}/cgroup  
                  /proc/${pid}/mountinfo

                  的信息也不一样,下面是我在本机实验的结果

                    /# cat /proc/self/cgroup
                    0::/
                    /# cat /proc/self/mountinfo
                    1640 1094 0:323 / / rw,relatime master:386 - overlay overlay rw,lowerdir=/var/lib/docker/overlay2/l/BBWBG35DB6VBUK3Z3WETTXC5YX:/var/lib/docker/overlay2/l/IHJT3IENJZVUMNOHTFAFQE3TMV,upperdir=/var/lib/docker/overlay2/ccc9ad6ccca1fb6e99945af53ef2896fd2493f3d7cc38fc463082d5858dffc20/diff,workdir=/var/lib/docker/overlay2/ccc9ad6ccca1fb6e99945af53ef2896fd2493f3d7cc38fc463082d5858dffc20/work
                    1641 1640 0:360 / /proc rw,nosuid,nodev,noexec,relatime - proc proc rw

                    里面没有上述获取cpu配额文件相关的信息,cpu配额信息在哪里呢,在另外一个文件里

                      /# cat /sys/fs/cgroup/cpu.max
                      200000 100000

                      由此可以推断,go.uber.org/automaxprocs 在docker for mac 上是不生效的,原因可以通过后面源码分析知晓。

                              同时补充一个知识点,linux的cgroup实现了两个版本,v1版本的多层级(hierarchy)设计导致进程的管理较为混乱,控制器之间行为不一致、接口不统一,因此新版linux采用了cgroupV2,对应的go.uber.org/automaxprocs 也兼容了上述两个版本。

                              go.uber.org/automaxprocs 如何使用呢?example_test.go的例子可以看到非常简单,import即可

                        import _ "go.uber.org/automaxprocs"

                                它执行了automaxprocs.go的init函数

                          import (
                            "log"
                          "go.uber.org/automaxprocs/maxprocs"



                          func init() {
                          maxprocs.Set(maxprocs.Logger(log.Printf))
                          }

                          它调用了maxprocs/maxprocs.go的set函数

                            func Set(opts ...Option) (func(), error) {
                              cfg := &config{
                            procs: iruntime.CPUQuotaToGOMAXPROCS,
                            minGOMAXPROCS: 1,
                            }
                              for _, o := range opts {
                            o.apply(cfg)
                            }
                                undoNoop := func() {
                            cfg.log("maxprocs: No GOMAXPROCS change to reset")
                            }
                            if max, exists := os.LookupEnv(_maxProcsKey); exists {
                            cfg.log("maxprocs: Honoring GOMAXPROCS=%q as set in environment", max)
                            return undoNoop, nil
                            }
                            maxProcs, status, err := cfg.procs(cfg.minGOMAXPROCS)
                            prev := currentMaxProcs()
                               
                               undo := func() {
                            cfg.log("maxprocs: Resetting GOMAXPROCS to %v", prev)
                            runtime.GOMAXPROCS(prev)
                               }

                            runtime.GOMAXPROCS(maxProcs)
                            return undo, nil

                            其中config定义如下

                              type config struct {
                              printf func(string, ...interface{})
                              procs func(int) (int, iruntime.CPUQuotaStatus, error)
                              minGOMAXPROCS int
                              }

                              函数干了下面四件事情:

                              • 1,通过currentMaxProcs()获取当前设置的p的数量

                              • 2,通过iruntime.CPUQuotaToGOMAXPROCS计算容器里最大cpu核数

                              • 3,通过runtime.GOMAXPROCS(maxProcs)设置P的数量

                              • 4,返回undo函数,可以通过undo函数恢复初始P的数量

                              接着我们一一分析每一步的源码实现:

                              1,获取当前设置的最大P的数量:直接调用的runtime的函数

                                 func currentMaxProcs() int {
                                return runtime.GOMAXPROCS(0)
                                }

                                可以跟下代码go/src/runtime/debug.go

                                  func GOMAXPROCS(n int) int {
                                  if GOARCH == "wasm" && n > 1 {
                                  n = 1
                                  lock(&sched.lock)
                                  ret := int(gomaxprocs)
                                  unlock(&sched.lock)
                                  if n <= 0 || n == ret {
                                  return ret
                                  }
                                    stopTheWorldGC("GOMAXPROCS")
                                    newprocs = int32(n)


                                  startTheWorldGC()
                                  return ret

                                  如果是小于等于0,返回当前的最大P值,否则修改全局变量newprocs 的值为n然后通过startTheWorldGC() 使得修改生效。其中全局变量定义如下:go/src/runtime/runtime2.go

                                        var (
                                    allm *m
                                    gomaxprocs int32
                                    ncpu int32
                                    forcegc forcegcstate
                                    sched schedt
                                    newprocs int32

                                    接着看下startTheWorldGC 相关的核心代码 go/src/runtime/proc.go

                                      func startTheWorldWithSema(emitTraceEvent bool) int64 {
                                         procs := gomaxprocs
                                      if newprocs != 0 {
                                      procs = newprocs
                                      newprocs = 0
                                      }
                                        p1 := procresize(procs)  

                                      gomaxprocs替换了全局变量procs的值,然后根据它的值增加或者删除P

                                        func procresize(nprocs int32) *p {
                                        for i := old; i < nprocs; i++ {
                                        pp := allp[i]
                                        if pp == nil {
                                        pp = new(p)
                                        }

                                        for i := nprocs; i < old; i++ {
                                        p := allp[i]
                                        p.destroy()

                                        2,iruntime.CPUQuotaToGOMAXPROCS 实现位于

                                        internal/runtime/cpu_quota_unsupported.go

                                          //go:build !linux
                                          // +build !linux
                                          func CPUQuotaToGOMAXPROCS(_ int) (int, CPUQuotaStatus, error) {
                                          return -1, CPUQuotaUndefined, nil
                                          }

                                          其中枚举值定义在internal/runtime/runtime.go

                                            type CPUQuotaStatus int
                                            const (
                                            // CPUQuotaUndefined is returned when CPU quota is undefined
                                            CPUQuotaUndefined CPUQuotaStatus = iota
                                            // CPUQuotaUsed is returned when a valid CPU quota can be used
                                            CPUQuotaUsed
                                            // CPUQuotaMinUsed is return when CPU quota is smaller than the min value
                                            CPUQuotaMinUsed
                                            )

                                            我们可以看到,对应的实现只支持linux版本,并且实现了两个版本兼容两个版本的cgroup,internal/runtime/cpu_quota_linux.go

                                              import (
                                              "errors"
                                                "math"
                                                
                                              cg "go.uber.org/automaxprocs/internal/cgroups"
                                               )
                                               
                                                func CPUQuotaToGOMAXPROCS(minValue int) (int, CPUQuotaStatus, error) {
                                              cgroups, err := newQueryer()
                                              quota, defined, err := cgroups.CPUQuota()
                                              maxProcs := int(math.Floor(quota))
                                                func newQueryer() (queryer, error) {
                                                cgroups, err := _newCgroups2()
                                                if errors.Is(err, cg.ErrNotV2) {
                                                return _newCgroups()
                                                  type queryer interface {
                                                  CPUQuota() (float64, bool, error)
                                                  }
                                                    var (
                                                    _newCgroups2 = cg.NewCGroups2ForCurrentProcess
                                                    _newCgroups = cg.NewCGroupsForCurrentProcess
                                                    )

                                                    先看下v2:internal/cgroups/cgroups2.go

                                                      _cgroupv2CPUMax = "cpu.max"
                                                      _cgroupv2FSType = "cgroup2"


                                                      _cgroupv2MountPoint = "/sys/fs/cgroup"
                                                        type CGroups2 struct {
                                                        mountPoint string
                                                        cpuMaxFile string
                                                        }
                                                          func NewCGroups2ForCurrentProcess() (*CGroups2, error) {
                                                          return newCGroups2FromMountInfo(_procPathMountInfo)
                                                          }
                                                            func newCGroups2FromMountInfo(mountInfoPath string) (*CGroups2, error) {
                                                            isV2, err := isCGroupV2(mountInfoPath)
                                                            return &CGroups2{
                                                            mountPoint: _cgroupv2MountPoint,
                                                            cpuMaxFile: _cgroupv2CPUMax,
                                                            }, nil
                                                              func isCGroupV2(procPathMountInfo string) (bool, error) {
                                                              var (
                                                              isV2 bool
                                                              newMountPoint = func(mp *MountPoint) error {
                                                              isV2 = isV2 || (mp.FSType == _cgroupv2FSType && mp.MountPoint == _cgroupv2MountPoint)
                                                              return nil
                                                              }
                                                              if err := parseMountInfo(procPathMountInfo, newMountPoint); err != nil {

                                                              获取配额的方法

                                                                func (cg *CGroups2) CPUQuota() (float64, bool, error) {
                                                                cpuMaxParams, err := os.Open(path.Join(cg.mountPoint, cg.cpuMaxFile))
                                                                scanner := bufio.NewScanner(cpuMaxParams)
                                                                max, err := strconv.Atoi(fields[_cgroupv2CPUMaxQuotaIndex])
                                                                if len(fields) == 1 {
                                                                period = _cgroupV2CPUMaxDefaultPeriod
                                                                } else {
                                                                period, err = strconv.Atoi(fields[_cgroupv2CPUMaxPeriodIndex])
                                                                return float64(max) / float64(period), true, nil
                                                                  const (
                                                                  _cgroupv2CPUMaxQuotaIndex = iota
                                                                  _cgroupv2CPUMaxPeriodIndex
                                                                  )

                                                                  cgroup的解析代码位于:internal/cgroups/cgroup.go

                                                                        //go:build linux
                                                                    // +build linux
                                                                    type CGroup struct {
                                                                    path string
                                                                    }
                                                                      func NewCGroup(path string) *CGroup {
                                                                      return &CGroup{path: path}
                                                                      }
                                                                        func (cg *CGroup) ParamPath(param string) string {
                                                                        return filepath.Join(cg.path, param)
                                                                        }
                                                                          func (cg *CGroup) readFirstLine(param string) (string, error) {
                                                                          paramFile, err := os.Open(cg.ParamPath(param))
                                                                          if err != nil {
                                                                            func (cg *CGroup) readInt(param string) (int, error) {
                                                                            text, err := cg.readFirstLine(param)

                                                                            cgroup v1 的代码位于internal/cgroups/cgroups.go

                                                                              // _cgroupCPUCFSQuotaUsParam is the file name for the CGroup CFS quota
                                                                              // parameter.
                                                                              _cgroupCPUCFSQuotaUsParam = "cpu.cfs_quota_us"
                                                                              // _cgroupCPUCFSPeriodUsParam is the file name for the CGroup CFS period
                                                                              // parameter.
                                                                              _cgroupCPUCFSPeriodUsParam = "cpu.cfs_period_us"
                                                                              )
                                                                                const (
                                                                                _procPathCGroup = "/proc/self/cgroup"
                                                                                _procPathMountInfo = "/proc/self/mountinfo"
                                                                                )
                                                                                  type CGroups map[string]*CGroup
                                                                                    func NewCGroups(procPathMountInfo, procPathCGroup string) (CGroups, error) {
                                                                                    cgroupSubsystems, err := parseCGroupSubsystems(procPathCGroup)
                                                                                    newMountPoint := func(mp *MountPoint) error {
                                                                                    cgroupPath, err := mp.Translate(subsys.Name)
                                                                                    cgroups[opt] = NewCGroup(cgroupPath)
                                                                                    if err := parseMountInfo(procPathMountInfo, newMountPoint); err !=
                                                                                      func NewCGroupsForCurrentProcess() (CGroups, error) {
                                                                                      return NewCGroups(_procPathMountInfo, _procPathCGroup)
                                                                                      }

                                                                                      计算方式也是类似的

                                                                                        func (cg CGroups) CPUQuota() (float64, bool, error) {
                                                                                        cpuCGroup, exists := cg[_cgroupSubsysCPU]
                                                                                        cfsQuotaUs, err := cpuCGroup.readInt(_cgroupCPUCFSQuotaUsParam)
                                                                                        cfsPeriodUs, err := cpuCGroup.readInt(_cgroupCPUCFSPeriodUsParam)
                                                                                        return float64(cfsQuotaUs) / float64(cfsPeriodUs), true, nil

                                                                                        解析挂载信息internal/cgroups/mountpoint.go

                                                                                          type MountPoint struct {
                                                                                          MountID int
                                                                                          ParentID int
                                                                                          DeviceID string
                                                                                          Root string
                                                                                          MountPoint string
                                                                                          Options []string
                                                                                          OptionalFields []string
                                                                                          FSType string
                                                                                          MountSource string
                                                                                          SuperOptions []string
                                                                                          }
                                                                                            func NewMountPointFromLine(line string) (*MountPoint, error) {
                                                                                            fields := strings.Split(line, _mountInfoSep)
                                                                                            mountID, err := strconv.Atoi(fields[_miFieldIDMountID])
                                                                                            parentID, err := strconv.Atoi(fields[_miFieldIDParentID])
                                                                                            for i, field := range fields[_miFieldIDOptionalFields:] {
                                                                                            if field == _mountInfoOptionalFieldsSep {
                                                                                            return &MountPoint{
                                                                                              func (mp *MountPoint) Translate(absPath string) (string, error) {
                                                                                                func parseMountInfo(procPathMountInfo string, newMountPoint func(*MountPoint) error) error {
                                                                                                mountInfoFile, err := os.Open(procPathMountInfo)
                                                                                                scanner := bufio.NewScanner(mountInfoFile)
                                                                                                for scanner.Scan() {
                                                                                                mountPoint, err := NewMountPointFromLine(scanner.Text())
                                                                                                if err := newMountPoint(mountPoint); err != nil {

                                                                                                internal/cgroups/subsys.go 分隔符

                                                                                                  const (
                                                                                                  _cgroupSep = ":"
                                                                                                  _cgroupSubsysSep = ","
                                                                                                  )

                                                                                                  字段位置

                                                                                                    const (
                                                                                                    _csFieldIDID = iota
                                                                                                    _csFieldIDSubsystems
                                                                                                    _csFieldIDName
                                                                                                    _csFieldCount
                                                                                                    )
                                                                                                      func NewCGroupSubsysFromLine(line string) (*CGroupSubsys, error) {
                                                                                                      fields := strings.SplitN(line, _cgroupSep, _csFieldCount)
                                                                                                      id, err := strconv.Atoi(fields[_csFieldIDID])
                                                                                                      cgroup := &CGroupSubsys{
                                                                                                      ID: id,
                                                                                                      Subsystems: strings.Split(fields[_csFieldIDSubsystems], _cgroupSubsysSep),
                                                                                                      Name: fields[_csFieldIDName],
                                                                                                      }

                                                                                                      解析cgroup信息

                                                                                                        func parseCGroupSubsystems(procPathCGroup string) (map[string]*CGroupSubsys, error) {
                                                                                                        cgroupFile, err := os.Open(procPathCGroup)
                                                                                                        scanner := bufio.NewScanner(cgroupFile)
                                                                                                        for scanner.Scan() {
                                                                                                        cgroup, err := NewCGroupSubsysFromLine(scanner.Text())
                                                                                                        for _, subsys := range cgroup.Subsystems {
                                                                                                        subsystems[subsys] = cgroup

                                                                                                        文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                                        评论