暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang 源码分析:mc,minio-go

        对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go

    https://github.com/minio/minio-go
    https://github.com/minio/mc

    1,MC

            mc是在 minio-go的基础上做了命令行的包装,常用的命令如下:

      ls       列出文件和文件夹。
      mb 创建一个存储桶或一个文件夹。
      cat 显示文件和对象内容。
      pipe 将一个STDIN重定向到一个对象或者文件或者STDOUT。
      share 生成用于共享的URL。
      cp 拷贝文件和对象。
      mirror 给存储桶和文件夹做镜像。
      find 基于参数查找文件。
      diff 对两个文件夹或者存储桶比较差异。
      rm 删除文件和对象。
      events 管理对象通知。
      watch 监听文件和对象的事件。
      policy 管理访问策略。
      session 为cp命令管理保存的会话。
      config 管理mc配置文件。
      update 检查软件更新。
      version 输出版本信息。

      当然,入口函数还是main.go:

        func main() {
        mc.Main(os.Args)
        }

        对应的每一个命令的实现在cmd目录下,由于mc的命令有自己特殊的模板,所以它没有用常用的cobra,而是自己定义了一套minio/cli,首先我们看下Main函数,它定义在cmd/main.go中

          func Main(args []string) 
          mainComplete()
          defer profile.Start(profile.CPUProfile, profile.ProfilePath(mustGetProfileDir())).Stop()
          probe.Init()
          if err := registerApp(appName).Run(args); err != nil

                      mainComplete定义在cmd/auto-complete.go

            func mainComplete() error 
               for _, cmd := range appCmds {
            if cmd.Hidden {
            continue
            }
            complCmds[cmd.Name] = cmdToCompleteCmd(cmd, "")
            }
               mcComplete := complete.Command{
            Sub: complCmds,
            GlobalFlags: complFlags,
            }
              complete.New(filepath.Base(os.Args[0]), mcComplete).Run()

            它把appCmds里面的命令注册成mc的子命令,appCmds是子命令列表,定义在cmd/main.go中:

              var appCmds = []cli.Command{
              aliasCmd,
              lsCmd,
              mbCmd,
              rbCmd,
              cpCmd,
              mirrorCmd,
              catCmd,
              headCmd,
              pipeCmd,
              shareCmd,
              findCmd,
              sqlCmd,
              statCmd,
              mvCmd,
              treeCmd,
              duCmd,
              retentionCmd,
              legalHoldCmd,
              diffCmd,
              rmCmd,
              versionCmd,
              ilmCmd,
              encryptCmd,
              eventCmd,
              watchCmd,
              undoCmd,
              anonymousCmd,
              policyCmd,
              tagCmd,
              replicateCmd,
              adminCmd,
              configCmd,
              updateCmd,
              }

              probe定义在pkg/probe/probe.go

                    func Init() {
                _, file, _, _ := runtime.Caller(1)
                rootPath = filepath.Dir(file)

                其中的Run命令是minio/cli框架的接口,分别定义在

                minio/cli@v1.22.0/app.go

                  func (a *App) Run(arguments []string) (err error) 
                  a.Setup()
                  c := a.Command(name)
                       if c != nil {
                  return c.Run(context)
                  }

                  minio/cli@v1.22.0/command.go

                    func (c Command) Run(ctx *Context) (err error) 
                    err = HandleAction(c.Action, context)

                    下面以tree命令为例,看下实现的细节: cmd/tree-main.go

                      var treeCmd = cli.Command{
                      Name: "tree",
                      Usage: "list buckets and objects in a tree format",
                      Action: mainTree,
                      OnUsageError: onUsageError,
                      Before: setGlobalsFromContext,
                      Flags: append(treeFlags, globalFlags...),
                      CustomHelpTemplate

                      对应执行的命令是mainTree

                        func mainTree(cliCtx *cli.Context) error
                        args, depth, includeFiles, timeRef := parseTreeSyntax(ctx, cliCtx)
                        if e := doTree(ctx, targetURL, timeRef, 1, false, "", depth, includeFiles); e != nil
                        clnt, err := newClientFromAlias(targetAlias, targetURL)
                        e := doList(ctx, clnt, true, false, false, timeRef, false);

                        在doTree方法里初始化了一个client,调用了对应的list接口:

                          func doTree(ctx context.Context, url string, timeRef time.Time, level int, leaf bool, branchString string, depth int, includeFiles bool) error
                          clnt, err := newClientFromAlias(targetAlias, targetURL)
                          show := func(end bool) error
                          for content := range clnt.List(ctx, ListOptions{Recursive: false, TimeRef: timeRef, ShowDir: DirFirst})

                          client是一个客户端的接口cmd/client.go

                            type Client interface {
                            // Common operations
                              Stat(ctx context.Context, opts StatOptions) (content *ClientContent, err *probe.Error)
                            List(ctx context.Context, opts ListOptions) <-chan *ClientContent

                            对应有两个具体实现,一个是本地文件系统,一个s3:

                            cmd/client-fs.go

                              func (f *fsClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
                              go f.listRecursiveInRoutine(contentCh, opts.WithMetadata)
                              go f.listDirOpt(contentCh, opts.Incomplete, opts.WithMetadata, opts.ShowDir)
                              go f.listInRoutine(contentCh, opts.WithMetadata)

                              通过walk方法递归调用visit方法做树状渲染展示:

                                func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent, isMetadata bool) 
                                visitFS := func(fp string, fi os.FileInfo, e error) error
                                e := xfilepath.Walk(dirName, visitFS)

                                cmd/client-s3.go

                                  func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
                                  c.versionedList(ctx, contentCh, opts)
                                  c.unversionedList(ctx, contentCh, opts)
                                    func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) 
                                    b, o := c.url2BucketAndObject()
                                      func (c *S3Client) url2BucketAndObject() (bucketName, objectName string) 
                                      buckets, err := c.api.ListBuckets(ctx)
                                      for _, bucket := range buckets {
                                      contentCh <- c.bucketInfo2ClientContent(bucket)
                                      for objectVersion := range c.listVersions(ctx, bucket.Name, "",
                                      opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers)
                                      c.unversionedList(ctx, contentCh, opts)
                                        func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) 
                                        c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)
                                        c.listIncompleteInRoutine(ctx, contentCh, opts)
                                        c.listRecursiveInRoutine(ctx, contentCh, opts)
                                        c.listInRoutine(ctx, contentCh, opts)
                                          func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
                                          buckets, err := c.api.ListBuckets(ctx)
                                          for _, bucket := range buckets {
                                          for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive)
                                            func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
                                            buckets, err := c.api.ListBuckets(ctx)
                                            for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1)
                                              func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo 
                                              return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)
                                              c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})
                                              c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})

                                              最终都是调用了SDK中的对应方法:

                                              minio/minio-go/v7@v7.0.16-0.20211108161804-a7a36ee131df/api-list.go

                                                func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo
                                                  func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo {
                                                  return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive)
                                                  }

                                                  我们看下另外一个ls命令,实现也是类似的:

                                                  cmd/ls-main.go

                                                    var lsCmd = cli.Command{
                                                    Name: "ls",
                                                    Usage: "list buckets and objects",
                                                    Action: mainList,
                                                    OnUsageError: onUsageError,
                                                    Before: setGlobalsFromContext,
                                                    Flags: append(lsFlags, globalFlags...),
                                                    CustomHelpTemplate:
                                                      func mainList(cliCtx *cli.Context) error
                                                      if e := doList(ctx, clnt, isRecursive, isIncomplete, isSummary, timeRef, withOlderVersions); e != nil

                                                      cmd/ls.go

                                                        func doList(ctx context.Context, clnt Client, isRecursive, isIncomplete, isSummary bool, timeRef time.Time, withOlderVersions bool) error
                                                        for content := range clnt.List(ctx, ListOptions{
                                                        Recursive: isRecursive,
                                                        Incomplete: isIncomplete,
                                                        TimeRef: timeRef,
                                                        WithOlderVersions: withOlderVersions || !timeRef.IsZero(),
                                                        WithDeleteMarkers: true,
                                                        ShowDir: DirNone,
                                                        })

                                                         2,SDK:minio-go

                                                                首先我们看下sdk是如何使用的:

                                                        1,创建client对象:

                                                          minioClient, err := minio.New(endpoint, &minio.Options{
                                                          Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
                                                          Secure: useSSL,
                                                          })

                                                          2,创建bucket,或者确认bucket是否存在:

                                                            err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
                                                              exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)

                                                              3,创建文件

                                                                info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ContentType: contentType})

                                                                client对象定义在api.go

                                                                  type Client struct {
                                                                  /// Standard options.




                                                                  // Parsed endpoint url provided by the user.
                                                                  endpointURL *url.URL




                                                                  // Holds various credential providers.
                                                                  credsProvider *credentials.Credentials




                                                                  // Custom signerType value overrides all credentials.
                                                                  overrideSignerType credentials.SignatureType




                                                                  // User supplied.
                                                                  appInfo struct {
                                                                  appName string
                                                                  appVersion string
                                                                  }




                                                                  // Indicate whether we are using https or not
                                                                  secure bool




                                                                  // Needs allocation.
                                                                  httpClient *http.Client
                                                                  bucketLocCache *bucketLocationCache




                                                                  // Advanced functionality.
                                                                  isTraceEnabled bool
                                                                  traceErrorsOnly bool
                                                                  traceOutput io.Writer




                                                                  // S3 specific accelerated endpoint.
                                                                  s3AccelerateEndpoint string




                                                                  // Region endpoint
                                                                  region string




                                                                  // Random seed.
                                                                  random *rand.Rand




                                                                  // lookup indicates type of url lookup supported by server. If not specified,
                                                                  // default to Auto.
                                                                  lookup BucketLookupType




                                                                  // Factory for MD5 hash functions.
                                                                  md5Hasher func() md5simd.Hasher
                                                                  sha256Hasher func() md5simd.Hasher




                                                                  healthStatus int32
                                                                  }
                                                                    func New(endpoint string, opts *Options) (*Client, error) 

                                                                    这个文件下还定义了一个executeMethod方法,这个方法是所有http请求的入口:

                                                                      func (c *Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) 
                                                                      bodyCloser, ok := metadata.contentBody.(io.Closer)
                                                                      for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter)
                                                                      req, err = c.newRequest(ctx, method, metadata)
                                                                      res, err = c.do(req)

                                                                      do方法简单包装了http client的do方法:

                                                                        func (c *Client) do(req *http.Request) (resp *http.Response, err error) 


                                                                        resp, err = c.httpClient.Do(req)

                                                                        api-put-bucket.go里定义了创建bucket的方法:

                                                                          func (c *Client) MakeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error) {
                                                                          return c.makeBucket(ctx, bucketName, opts)
                                                                          }
                                                                            func (c *Client) makeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error)


                                                                            err = c.doMakeBucket(ctx, bucketName, opts.Region, opts.ObjectLocking)

                                                                            最终调用了上述executeMethod方法:

                                                                              func (c *Client) doMakeBucket(ctx context.Context, bucketName string, location string, objectLockEnabled bool) (err error) 
                                                                              createBucketConfigBytes, err = xml.Marshal(createBucketConfig)
                                                                              reqMetadata.contentMD5Base64 = sumMD5Base64(createBucketConfigBytes)
                                                                              resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)

                                                                              BucketExists方法定义在api-stat.go

                                                                                func (c *Client) BucketExists(ctx context.Context, bucketName string) (bool, error)
                                                                                resp, err := c.executeMethod(ctx, http.MethodHead, requestMetadata{
                                                                                bucketName: bucketName,
                                                                                contentSHA256Hex: emptySHA256Hex,
                                                                                })

                                                                                FPutObject定义在:api-put-object-file-context.go

                                                                                  func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (info UploadInfo, err error) 
                                                                                  fileReader, err := os.Open(filePath)
                                                                                  fileStat, err := fileReader.Stat()
                                                                                  fileSize := fileStat.Size()
                                                                                  return c.PutObject(ctx, bucketName, objectName, fileReader, fileSize, opts)

                                                                                  api-put-object.go

                                                                                    func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
                                                                                    opts PutObjectOptions) (info UploadInfo, err error)
                                                                                    return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)

                                                                                    根据大小和url的类型确定上传方式,可以整体也可以分片,还可以流式

                                                                                      func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)


                                                                                      if size > int64(maxMultipartPutObjectSize)
                                                                                      if s3utils.IsGoogleEndpoint(*c.endpointURL) {
                                                                                      return c.putObject(ctx, bucketName, objectName, reader, size, opts)
                                                                                      if c.overrideSignerType.IsV2() {
                                                                                      if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
                                                                                      return c.putObject(ctx, bucketName, objectName, reader, size, opts)
                                                                                      }
                                                                                      return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
                                                                                      }
                                                                                      return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)

                                                                                      这里定义了一个常量,最大允许5T大小

                                                                                      api-put-object-streaming.go

                                                                                        func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) 
                                                                                        return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
                                                                                          func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error)
                                                                                              resperr := c.executeMethod(ctx, http.MethodPut, reqMetadata)

                                                                                          api-put-object-multipart.go

                                                                                            func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
                                                                                            opts PutObjectOptions) (info UploadInfo, err error)
                                                                                            info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
                                                                                            return c.putObject(ctx, bucketName, objectName, reader, size, opts)

                                                                                            分片上传根据分片大小,计算出分片数目,然后创建上传的id,最后合并分片:

                                                                                              func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) 
                                                                                              totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
                                                                                              uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
                                                                                              for partNumber <= totalPartsCount {
                                                                                              objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
                                                                                              md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
                                                                                              uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
                                                                                                func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
                                                                                                partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error)
                                                                                                resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
                                                                                                  func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
                                                                                                  complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error)
                                                                                                  resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)

                                                                                                  除了基本的上传,在examples下面还定义了一些其他的接口的使用例子:

                                                                                                  examples/minio/listen-notification.go

                                                                                                    minioClient.ListenNotification(context.Background(), "PREFIX", "SUFFIX", []string{
                                                                                                    "s3:BucketCreated:*",
                                                                                                    "s3:BucketRemoved:*",
                                                                                                    "s3:ObjectCreated:*",
                                                                                                    "s3:ObjectAccessed:*",
                                                                                                    "s3:ObjectRemoved:*",
                                                                                                    })

                                                                                                    examples/minio/listenbucketnotification.go

                                                                                                      minioClient.ListenBucketNotification(context.Background(), "YOUR-BUCKET", "PREFIX", "SUFFIX", []string{
                                                                                                      "s3:ObjectCreated:*",
                                                                                                      "s3:ObjectAccessed:*",
                                                                                                      "s3:ObjectRemoved:*",
                                                                                                      })

                                                                                                      examples/minio/putobjectsnowball.go

                                                                                                        minioClient.ListObjects(context.Background(), YOURBUCKET, lopts)

                                                                                                        examples/minio/getbucketreplicationmetrics.go

                                                                                                          s3Client.TraceOn(os.Stderr)
                                                                                                          m, err := s3Client.GetBucketReplicationMetrics(context.Background(), "bucket")

                                                                                                          api-bucket-notification.go

                                                                                                            func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info 
                                                                                                              func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info 
                                                                                                              resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
                                                                                                              bucketName: bucketName,
                                                                                                              queryValues: urlValues,
                                                                                                              contentSHA256Hex: emptySHA256Hex,
                                                                                                              })

                                                                                                                    以上就是mc和sdk的源码,整体来说就是对minio的接口做了一层httpclient 的封装,加了一些参数校验的逻辑。


                                                                                                              文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                                              评论