对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go
https://github.com/minio/minio-gohttps://github.com/minio/mc
1,MC
mc是在 minio-go的基础上做了命令行的包装,常用的命令如下:
ls 列出文件和文件夹。mb 创建一个存储桶或一个文件夹。cat 显示文件和对象内容。pipe 将一个STDIN重定向到一个对象或者文件或者STDOUT。share 生成用于共享的URL。cp 拷贝文件和对象。mirror 给存储桶和文件夹做镜像。find 基于参数查找文件。diff 对两个文件夹或者存储桶比较差异。rm 删除文件和对象。events 管理对象通知。watch 监听文件和对象的事件。policy 管理访问策略。session 为cp命令管理保存的会话。config 管理mc配置文件。update 检查软件更新。version 输出版本信息。
当然,入口函数还是main.go:
func main() {mc.Main(os.Args)}
对应的每一个命令的实现在cmd目录下,由于mc的命令有自己特殊的模板,所以它没有用常用的cobra,而是自己定义了一套minio/cli,首先我们看下Main函数,它定义在cmd/main.go中
func Main(args []string)mainComplete()defer profile.Start(profile.CPUProfile, profile.ProfilePath(mustGetProfileDir())).Stop()probe.Init()if err := registerApp(appName).Run(args); err != nil
mainComplete定义在cmd/auto-complete.go
func mainComplete() errorfor _, cmd := range appCmds {if cmd.Hidden {continue}complCmds[cmd.Name] = cmdToCompleteCmd(cmd, "")}mcComplete := complete.Command{Sub: complCmds,GlobalFlags: complFlags,}complete.New(filepath.Base(os.Args[0]), mcComplete).Run()
它把appCmds里面的命令注册成mc的子命令,appCmds是子命令列表,定义在cmd/main.go中:
var appCmds = []cli.Command{aliasCmd,lsCmd,mbCmd,rbCmd,cpCmd,mirrorCmd,catCmd,headCmd,pipeCmd,shareCmd,findCmd,sqlCmd,statCmd,mvCmd,treeCmd,duCmd,retentionCmd,legalHoldCmd,diffCmd,rmCmd,versionCmd,ilmCmd,encryptCmd,eventCmd,watchCmd,undoCmd,anonymousCmd,policyCmd,tagCmd,replicateCmd,adminCmd,configCmd,updateCmd,}
probe定义在pkg/probe/probe.go
func Init() {_, file, _, _ := runtime.Caller(1)rootPath = filepath.Dir(file)
其中的Run命令是minio/cli框架的接口,分别定义在
minio/cli@v1.22.0/app.go
func (a *App) Run(arguments []string) (err error)a.Setup()c := a.Command(name)if c != nil {return c.Run(context)}
minio/cli@v1.22.0/command.go
func (c Command) Run(ctx *Context) (err error)err = HandleAction(c.Action, context)
下面以tree命令为例,看下实现的细节: cmd/tree-main.go
var treeCmd = cli.Command{Name: "tree",Usage: "list buckets and objects in a tree format",Action: mainTree,OnUsageError: onUsageError,Before: setGlobalsFromContext,Flags: append(treeFlags, globalFlags...),CustomHelpTemplate
对应执行的命令是mainTree
func mainTree(cliCtx *cli.Context) errorargs, depth, includeFiles, timeRef := parseTreeSyntax(ctx, cliCtx)if e := doTree(ctx, targetURL, timeRef, 1, false, "", depth, includeFiles); e != nilclnt, err := newClientFromAlias(targetAlias, targetURL)e := doList(ctx, clnt, true, false, false, timeRef, false);
在doTree方法里初始化了一个client,调用了对应的list接口:
func doTree(ctx context.Context, url string, timeRef time.Time, level int, leaf bool, branchString string, depth int, includeFiles bool) errorclnt, err := newClientFromAlias(targetAlias, targetURL)show := func(end bool) errorfor content := range clnt.List(ctx, ListOptions{Recursive: false, TimeRef: timeRef, ShowDir: DirFirst})
client是一个客户端的接口cmd/client.go
type Client interface {// Common operationsStat(ctx context.Context, opts StatOptions) (content *ClientContent, err *probe.Error)List(ctx context.Context, opts ListOptions) <-chan *ClientContent
对应有两个具体实现,一个是本地文件系统,一个s3:
cmd/client-fs.go
func (f *fsClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContentgo f.listRecursiveInRoutine(contentCh, opts.WithMetadata)go f.listDirOpt(contentCh, opts.Incomplete, opts.WithMetadata, opts.ShowDir)go f.listInRoutine(contentCh, opts.WithMetadata)
通过walk方法递归调用visit方法做树状渲染展示:
func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent, isMetadata bool)visitFS := func(fp string, fi os.FileInfo, e error) errore := xfilepath.Walk(dirName, visitFS)
cmd/client-s3.go
func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContentc.versionedList(ctx, contentCh, opts)c.unversionedList(ctx, contentCh, opts)
func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)b, o := c.url2BucketAndObject()
func (c *S3Client) url2BucketAndObject() (bucketName, objectName string)buckets, err := c.api.ListBuckets(ctx)for _, bucket := range buckets {contentCh <- c.bucketInfo2ClientContent(bucket)for objectVersion := range c.listVersions(ctx, bucket.Name, "",opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers)c.unversionedList(ctx, contentCh, opts)
func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)c.listIncompleteInRoutine(ctx, contentCh, opts)c.listRecursiveInRoutine(ctx, contentCh, opts)c.listInRoutine(ctx, contentCh, opts)
func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)buckets, err := c.api.ListBuckets(ctx)for _, bucket := range buckets {for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive)
func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)buckets, err := c.api.ListBuckets(ctx)for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1)
func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInforeturn c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})
最终都是调用了SDK中的对应方法:
minio/minio-go/v7@v7.0.16-0.20211108161804-a7a36ee131df/api-list.go
func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo
func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo {return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive)}
我们看下另外一个ls命令,实现也是类似的:
cmd/ls-main.go
var lsCmd = cli.Command{Name: "ls",Usage: "list buckets and objects",Action: mainList,OnUsageError: onUsageError,Before: setGlobalsFromContext,Flags: append(lsFlags, globalFlags...),CustomHelpTemplate:
func mainList(cliCtx *cli.Context) errorif e := doList(ctx, clnt, isRecursive, isIncomplete, isSummary, timeRef, withOlderVersions); e != nil
cmd/ls.go
func doList(ctx context.Context, clnt Client, isRecursive, isIncomplete, isSummary bool, timeRef time.Time, withOlderVersions bool) errorfor content := range clnt.List(ctx, ListOptions{Recursive: isRecursive,Incomplete: isIncomplete,TimeRef: timeRef,WithOlderVersions: withOlderVersions || !timeRef.IsZero(),WithDeleteMarkers: true,ShowDir: DirNone,})
2,SDK:minio-go
首先我们看下sdk是如何使用的:
1,创建client对象:
minioClient, err := minio.New(endpoint, &minio.Options{Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),Secure: useSSL,})
2,创建bucket,或者确认bucket是否存在:
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)
3,创建文件
info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ContentType: contentType})
client对象定义在api.go
type Client struct {/// Standard options.// Parsed endpoint url provided by the user.endpointURL *url.URL// Holds various credential providers.credsProvider *credentials.Credentials// Custom signerType value overrides all credentials.overrideSignerType credentials.SignatureType// User supplied.appInfo struct {appName stringappVersion string}// Indicate whether we are using https or notsecure bool// Needs allocation.httpClient *http.ClientbucketLocCache *bucketLocationCache// Advanced functionality.isTraceEnabled booltraceErrorsOnly booltraceOutput io.Writer// S3 specific accelerated endpoint.s3AccelerateEndpoint string// Region endpointregion string// Random seed.random *rand.Rand// lookup indicates type of url lookup supported by server. If not specified,// default to Auto.lookup BucketLookupType// Factory for MD5 hash functions.md5Hasher func() md5simd.Hashersha256Hasher func() md5simd.HasherhealthStatus int32}
func New(endpoint string, opts *Options) (*Client, error)
这个文件下还定义了一个executeMethod方法,这个方法是所有http请求的入口:
func (c *Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error)bodyCloser, ok := metadata.contentBody.(io.Closer)for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter)req, err = c.newRequest(ctx, method, metadata)res, err = c.do(req)
do方法简单包装了http client的do方法:
func (c *Client) do(req *http.Request) (resp *http.Response, err error)resp, err = c.httpClient.Do(req)
api-put-bucket.go里定义了创建bucket的方法:
func (c *Client) MakeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error) {return c.makeBucket(ctx, bucketName, opts)}
func (c *Client) makeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error)err = c.doMakeBucket(ctx, bucketName, opts.Region, opts.ObjectLocking)
最终调用了上述executeMethod方法:
func (c *Client) doMakeBucket(ctx context.Context, bucketName string, location string, objectLockEnabled bool) (err error)createBucketConfigBytes, err = xml.Marshal(createBucketConfig)reqMetadata.contentMD5Base64 = sumMD5Base64(createBucketConfigBytes)resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
BucketExists方法定义在api-stat.go
func (c *Client) BucketExists(ctx context.Context, bucketName string) (bool, error)resp, err := c.executeMethod(ctx, http.MethodHead, requestMetadata{bucketName: bucketName,contentSHA256Hex: emptySHA256Hex,})
FPutObject定义在:api-put-object-file-context.go
func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (info UploadInfo, err error)fileReader, err := os.Open(filePath)fileStat, err := fileReader.Stat()fileSize := fileStat.Size()return c.PutObject(ctx, bucketName, objectName, fileReader, fileSize, opts)
api-put-object.go
func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,opts PutObjectOptions) (info UploadInfo, err error)return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
根据大小和url的类型确定上传方式,可以整体也可以分片,还可以流式
func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)if size > int64(maxMultipartPutObjectSize)if s3utils.IsGoogleEndpoint(*c.endpointURL) {return c.putObject(ctx, bucketName, objectName, reader, size, opts)if c.overrideSignerType.IsV2() {if size >= 0 && size < int64(partSize) || opts.DisableMultipart {return c.putObject(ctx, bucketName, objectName, reader, size, opts)}return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)}return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
这里定义了一个常量,最大允许5T大小
api-put-object-streaming.go
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error)resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
api-put-object-multipart.go
func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,opts PutObjectOptions) (info UploadInfo, err error)info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)return c.putObject(ctx, bucketName, objectName, reader, size, opts)
分片上传根据分片大小,计算出分片数目,然后创建上传的id,最后合并分片:
func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error)totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)for partNumber <= totalPartsCount {objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error)resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error)resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
除了基本的上传,在examples下面还定义了一些其他的接口的使用例子:
examples/minio/listen-notification.go
minioClient.ListenNotification(context.Background(), "PREFIX", "SUFFIX", []string{"s3:BucketCreated:*","s3:BucketRemoved:*","s3:ObjectCreated:*","s3:ObjectAccessed:*","s3:ObjectRemoved:*",})
examples/minio/listenbucketnotification.go
minioClient.ListenBucketNotification(context.Background(), "YOUR-BUCKET", "PREFIX", "SUFFIX", []string{"s3:ObjectCreated:*","s3:ObjectAccessed:*","s3:ObjectRemoved:*",})
examples/minio/putobjectsnowball.go
minioClient.ListObjects(context.Background(), YOURBUCKET, lopts)
examples/minio/getbucketreplicationmetrics.go
s3Client.TraceOn(os.Stderr)m, err := s3Client.GetBucketReplicationMetrics(context.Background(), "bucket")
api-bucket-notification.go
func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info
func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Inforesp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{bucketName: bucketName,queryValues: urlValues,contentSHA256Hex: emptySHA256Hex,})
以上就是mc和sdk的源码,整体来说就是对minio的接口做了一层httpclient 的封装,加了一些参数校验的逻辑。






