暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang 源码分析:minio(part II)文件的操作

        在分析完minio请求的路由后golang 源码分析:minio(part I)路由,我们看下一个文件是如何落盘的,不考虑gateway情况,我们从serverMain开始:

cmd/server-main.go

    func serverMain(ctx *cli.Context) {
    handler, err := configureServerHandler(globalEndpoints)
    registerAPIRouter(router)
    newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
          initServer(GlobalContext, newObject)

    在注册完路由后,会创建一个object,然后initServer,在initServer里将object赋值给自己的属性:

      func initServer(ctx context.Context, newObject ObjectLayer) error 
      setObjectLayer(newObject)

      在创建object的时候,有两个分支,我们重点看只有一个endpoint的情况:

      cmd/erasure-server-pool.go

        func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error) 
        commonParityDrives = ecDrivesNoConfig(ep.DrivesPerSet)
        err = storageclass.ValidateParity(commonParityDrives, ep.DrivesPerSet)
        z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)

        cmd/server-main.go

          func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error)
          return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
          return newErasureServerPools(ctx, endpointServerPools)

          object 存储在全局变量globalObjectAPI里,对应的get和set方法定义在cmd/api-router.go,由于是全局变量,所以需要读锁

            func setObjectLayer(o ObjectLayer) {
            globalObjectAPI = o
              func newObjectLayerFn() ObjectLayer {
              globalObjLayerMutex.RLock()
              defer globalObjLayerMutex.RUnlock()
              return globalObjectAPI
              }

              在registerAPIRouter方法内部会初始化api对象,它的一个接口用newObjectLayerFn 来赋值的,这样就实现了object对象和router的关联,相关的操作最终都是调用的object对象的方法来完成的

                func registerAPIRouter(router *mux.Router) 
                api := objectAPIHandlers{
                ObjectAPI: newObjectLayerFn,
                CacheAPI: newCachedObjectLayerFn,
                }

                下面以三个路由为例看下文件的操作相关流程

                  // GetObject - note gzip compression is *not* added due to Range requests.
                  router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
                  collectAPIStats("getobject", maxClients(httpTraceHdrs(api.GetObjectHandler))))
                    router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
                    collectAPIStats("putobjectpart", maxClients(gz(httpTraceHdrs(api.PutObjectPartHandler))))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
                      router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
                      collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectHandler)))))

                      globalObjectAPI 定义在cmd/object-api-common.go

                        var globalObjectAPI ObjectLayer
                        bucketMetaPrefix = "buckets"

                        和router对应的handler定义在:cmd/object-handlers.go

                          func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) 
                          objectAPI := api.ObjectAPI()
                          api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)

                          先获取handler getObjectInfo 然后调用handler

                            func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)
                            getObjectInfo := objectAPI.GetObjectInfo
                            _, err = getObjectInfo(ctx, bucket, object, opts)
                            getObjectNInfo := objectAPI.GetObjectNInfo
                            gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
                                
                                sendEvent(eventArgs{
                            EventName: event.ObjectAccessedGet,
                            BucketName: bucket,
                            Object: objInfo,
                            ReqParams: extractReqParams(r),
                            RespElements: extractRespElements(w),
                            UserAgent: r.UserAgent(),
                            Host: handlers.GetSourceIP(r),
                            })

                            部分上传也一样的

                              func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request)
                              objectAPI := api.ObjectAPI()
                              mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
                              putObjectPart := objectAPI.PutObjectPart
                              partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)

                              全量上传:

                                func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) 
                                objectAPI := api.ObjectAPI()
                                putObject = objectAPI.PutObject
                                objInfo, err := putObject(ctx, bucket, object, pReader, opts)
                                getObjectInfo := objectAPI.GetObjectInfo
                                _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts);

                                单机版本的minio 实现的存储对象定义在:cmd/fs-v1.go

                                  func NewFSObjectLayer(fsPath string) (ObjectLayer, error) 
                                  fsPath, err = getValidPath(fsPath);
                                  fsUUID := mustGetUUID()
                                  err = initMetaVolumeFS(fsPath, fsUUID);
                                  rlk, err := initFormatFS(ctx, fsPath)
                                     
                                     fs := &FSObjects{
                                  fsPath: fsPath,
                                  metaJSONFile: fsMetaJSONFile,
                                  fsUUID: fsUUID,
                                  rwPool: &fsIOPool{
                                  readersMap: make(map[string]*lock.RLockedFile),
                                  },
                                  nsMutex: newNSLock(false),
                                  listPool: NewTreeWalkPool(globalLookupTimeout),
                                  appendFileMap: make(map[string]*fsAppendFile),
                                  diskMount: mountinfo.IsLikelyMountPoint(fsPath),
                                  }
                                  fs.fsFormatRlk = rlk

                                  会创建一个自己的文件系统

                                    func initMetaVolumeFS(fsPath, fsUUID string) error 
                                    metaBucketPath := pathJoin(fsPath, minioMetaBucket)
                                    err := os.MkdirAll(metaBucketPath, 0777);
                                    metaTmpPath := pathJoin(fsPath, minioMetaTmpBucket, fsUUID)
                                    err := os.MkdirAll(metaTmpPath, 0777);
                                    metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket)
                                          os.MkdirAll(metaMultipartPath, 0777)
                                      func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) 
                                      oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
                                      fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
                                      err = fs.createFsJSON(object, fsMetaPath)

                                      读取的时候需要加锁

                                        func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error)
                                        lk := fs.NewNSLock(bucket, object)
                                        _, err := fs.statBucketDir(ctx, bucket);
                                        return fs.getObjectInfo(ctx, bucket, object)

                                        最终调用系统调用fsStat获取文件的信息:

                                          func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileInfo, error)
                                          bucketDir, err := fs.getBucketDir(ctx, bucket)
                                          st, err := fsStatVolume(ctx, bucketDir)
                                          fi, err := fsStat(ctx, volume)
                                            func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error)
                                            fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))
                                            fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
                                            rlk, err := fs.rwPool.Open(fsMetaPath)
                                            _, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile)
                                            fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
                                            return fsMeta.ToObjectInfo(bucket, object, fi), nil

                                            创建文件的过程中,先在tmp目录下创建文件,等待文件创建完毕后,rename到目标目录,能够尽可能地减少锁冲突:

                                              func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) 
                                              return fs.putObject(ctx, bucket, object, r, opts)

                                              创建文件的过程中,会先创建meta文件,保存文件的元数据信息:

                                                func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) 
                                                fs.statBucketDir(ctx, bucket);
                                                isObjectDir(object, data.Size())
                                                if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil
                                                var wlk *lock.LockedFile
                                                if bucket != minioMetaBucket {
                                                bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
                                                fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
                                                wlk, err = fs.rwPool.Write(fsMetaPath)
                                                wlk, err = fs.rwPool.Create(fsMetaPath)
                                                fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
                                                bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, data.Size())
                                                fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
                                                if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath);
                                                _, err = fsMeta.WriteTo(wlk);
                                                if err = jsonSave(lk, m);
                                                fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))

                                                cmd/fs-v1-helpers.go

                                                  func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, fallocSize int64) (int64, error) 
                                                  if err := checkPathLength(filePath);
                                                  writer, err := lock.Open(filePath, flags, 0666)
                                                  bytesWritten, err := xioutil.Copy(writer, reader)

                                                  cmd/object-api-utils.go问及里定义minio文件系统中目录的名字

                                                    const (
                                                    // MinIO meta bucket.
                                                    minioMetaBucket = ".minio.sys"
                                                    minioMetaTmpBucket = minioMetaBucket + "/tmp"
                                                    mpartMetaPrefix = "multipart"
                                                    minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix

                                                    cmd/data-usage.go

                                                      dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix

                                                      cmd/format-fs.go

                                                        func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, err error) 
                                                        fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)
                                                        err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nil
                                                        formatBackend, err := formatMetaGetFormatBackendFS(rlk)
                                                        err = jsonLoad(rlk, format)
                                                        return jsonSave(wlk, format)
                                                        rlk, err := lock.RLockedOpenFile(fsFormatPath)
                                                        formatBackend, err := formatMetaGetFormatBackendFS(rlk)
                                                        wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)

                                                        cmd/format-meta.go

                                                          formatConfigFile = "format.json"

                                                          每一个操作都会发相应的通知,通知是存在一个map里cmd/notification.go

                                                            func sendEvent(args eventArgs)
                                                                globalNotificationSys.Send(args)
                                                            func (sys *NotificationSys) Send(args eventArgs)
                                                            targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
                                                            sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)

                                                            internal/event/targetlist.go

                                                              type TargetList struct {
                                                              sync.RWMutex
                                                              targets map[TargetID]Target
                                                              }

                                                              元文件的定义如下:cmd/fs-v1-metadata.go

                                                                type fsMetaV1 struct {
                                                                Version string `json:"version"`
                                                                // checksums of blocks on disk.
                                                                Checksum FSChecksumInfoV1 `json:"checksum,omitempty"`
                                                                // Metadata map for current object.
                                                                Meta map[string]string `json:"meta,omitempty"`
                                                                // parts info for current object - used in encryption.
                                                                Parts []ObjectPartInfo `json:"parts,omitempty"`
                                                                }

                                                                cmd/xl-storage-format-v1.go

                                                                  type ObjectPartInfo struct {
                                                                  ETag string `json:"etag,omitempty"`
                                                                  Number int `json:"number"`
                                                                  Size int64 `json:"size"`
                                                                  ActualSize int64 `json:"actualSize"`
                                                                  }

                                                                  cmd/fs-v1-helpers.go

                                                                    func fsStatDir(ctx context.Context, statDir string) (os.FileInfo, error)
                                                                    fi, err := fsStat(ctx, statDir)

                                                                    分片上传相对复杂,代码路径在:cmd/fs-v1-multipart.go

                                                                      func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error)
                                                                      if _, err := fs.statBucketDir(ctx, bucket); err != nil {
                                                                      uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
                                                                      _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
                                                                      tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))
                                                                      bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())
                                                                      defer fsRemoveFile(ctx, tmpPartPath)
                                                                      partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))
                                                                      err = fsSimpleRenameFile(ctx, tmpPartPath, partPath);
                                                                      go fs.backgroundAppend(ctx, bucket, object, uploadID)
                                                                      fi, err := fsStatFile(ctx, partPath)


                                                                      文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                      评论