在分析完minio请求的路由后golang 源码分析:minio(part I)路由,我们看下一个文件是如何落盘的,不考虑gateway情况,我们从serverMain开始:
cmd/server-main.go
func serverMain(ctx *cli.Context) {handler, err := configureServerHandler(globalEndpoints)registerAPIRouter(router)newObject, err := newObjectLayer(GlobalContext, globalEndpoints)initServer(GlobalContext, newObject)
在注册完路由后,会创建一个object,然后initServer,在initServer里将object赋值给自己的属性:
func initServer(ctx context.Context, newObject ObjectLayer) errorsetObjectLayer(newObject)
在创建object的时候,有两个分支,我们重点看只有一个endpoint的情况:
cmd/erasure-server-pool.go
func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error)commonParityDrives = ecDrivesNoConfig(ep.DrivesPerSet)err = storageclass.ValidateParity(commonParityDrives, ep.DrivesPerSet)z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)
cmd/server-main.go
func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error)return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)return newErasureServerPools(ctx, endpointServerPools)
object 存储在全局变量globalObjectAPI里,对应的get和set方法定义在cmd/api-router.go,由于是全局变量,所以需要读锁
func setObjectLayer(o ObjectLayer) {globalObjectAPI = o
func newObjectLayerFn() ObjectLayer {globalObjLayerMutex.RLock()defer globalObjLayerMutex.RUnlock()return globalObjectAPI}
在registerAPIRouter方法内部会初始化api对象,它的一个接口用newObjectLayerFn 来赋值的,这样就实现了object对象和router的关联,相关的操作最终都是调用的object对象的方法来完成的
func registerAPIRouter(router *mux.Router)api := objectAPIHandlers{ObjectAPI: newObjectLayerFn,CacheAPI: newCachedObjectLayerFn,}
下面以三个路由为例看下文件的操作相关流程
// GetObject - note gzip compression is *not* added due to Range requests.router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobject", maxClients(httpTraceHdrs(api.GetObjectHandler))))
router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectpart", maxClients(gz(httpTraceHdrs(api.PutObjectPartHandler))))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
router.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectHandler)))))
globalObjectAPI 定义在cmd/object-api-common.go
var globalObjectAPI ObjectLayerbucketMetaPrefix = "buckets"
和router对应的handler定义在:cmd/object-handlers.go
func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request)objectAPI := api.ObjectAPI()api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)
先获取handler getObjectInfo 然后调用handler
func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)getObjectInfo := objectAPI.GetObjectInfo_, err = getObjectInfo(ctx, bucket, object, opts)getObjectNInfo := objectAPI.GetObjectNInfogr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)sendEvent(eventArgs{EventName: event.ObjectAccessedGet,BucketName: bucket,Object: objInfo,ReqParams: extractReqParams(r),RespElements: extractRespElements(w),UserAgent: r.UserAgent(),Host: handlers.GetSourceIP(r),})
部分上传也一样的
func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request)objectAPI := api.ObjectAPI()mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)putObjectPart := objectAPI.PutObjectPartpartInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
全量上传:
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request)objectAPI := api.ObjectAPI()putObject = objectAPI.PutObjectobjInfo, err := putObject(ctx, bucket, object, pReader, opts)getObjectInfo := objectAPI.GetObjectInfo_, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts);
单机版本的minio 实现的存储对象定义在:cmd/fs-v1.go
func NewFSObjectLayer(fsPath string) (ObjectLayer, error)fsPath, err = getValidPath(fsPath);fsUUID := mustGetUUID()err = initMetaVolumeFS(fsPath, fsUUID);rlk, err := initFormatFS(ctx, fsPath)fs := &FSObjects{fsPath: fsPath,metaJSONFile: fsMetaJSONFile,fsUUID: fsUUID,rwPool: &fsIOPool{readersMap: make(map[string]*lock.RLockedFile),},nsMutex: newNSLock(false),listPool: NewTreeWalkPool(globalLookupTimeout),appendFileMap: make(map[string]*fsAppendFile),diskMount: mountinfo.IsLikelyMountPoint(fsPath),}fs.fsFormatRlk = rlk
会创建一个自己的文件系统
func initMetaVolumeFS(fsPath, fsUUID string) errormetaBucketPath := pathJoin(fsPath, minioMetaBucket)err := os.MkdirAll(metaBucketPath, 0777);metaTmpPath := pathJoin(fsPath, minioMetaTmpBucket, fsUUID)err := os.MkdirAll(metaTmpPath, 0777);metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket)os.MkdirAll(metaMultipartPath, 0777)
func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error)oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)err = fs.createFsJSON(object, fsMetaPath)
读取的时候需要加锁
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error)lk := fs.NewNSLock(bucket, object)_, err := fs.statBucketDir(ctx, bucket);return fs.getObjectInfo(ctx, bucket, object)
最终调用系统调用fsStat获取文件的信息:
func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileInfo, error)bucketDir, err := fs.getBucketDir(ctx, bucket)st, err := fsStatVolume(ctx, bucketDir)fi, err := fsStat(ctx, volume)
func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error)fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)rlk, err := fs.rwPool.Open(fsMetaPath)_, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile)fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))return fsMeta.ToObjectInfo(bucket, object, fi), nil
创建文件的过程中,先在tmp目录下创建文件,等待文件创建完毕后,rename到目标目录,能够尽可能地减少锁冲突:
func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)return fs.putObject(ctx, bucket, object, r, opts)
创建文件的过程中,会先创建meta文件,保存文件的元数据信息:
func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error)fs.statBucketDir(ctx, bucket);isObjectDir(object, data.Size())if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nilvar wlk *lock.LockedFileif bucket != minioMetaBucket {bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)wlk, err = fs.rwPool.Write(fsMetaPath)wlk, err = fs.rwPool.Create(fsMetaPath)fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, data.Size())fsNSObjPath := pathJoin(fs.fsPath, bucket, object)if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath);_, err = fsMeta.WriteTo(wlk);if err = jsonSave(lk, m);fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
cmd/fs-v1-helpers.go
func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, fallocSize int64) (int64, error)if err := checkPathLength(filePath);writer, err := lock.Open(filePath, flags, 0666)bytesWritten, err := xioutil.Copy(writer, reader)
cmd/object-api-utils.go问及里定义minio文件系统中目录的名字
const (// MinIO meta bucket.minioMetaBucket = ".minio.sys"minioMetaTmpBucket = minioMetaBucket + "/tmp"mpartMetaPrefix = "multipart"minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix
cmd/data-usage.go
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
cmd/format-fs.go
func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, err error)fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nilformatBackend, err := formatMetaGetFormatBackendFS(rlk)err = jsonLoad(rlk, format)return jsonSave(wlk, format)rlk, err := lock.RLockedOpenFile(fsFormatPath)formatBackend, err := formatMetaGetFormatBackendFS(rlk)wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
cmd/format-meta.go
formatConfigFile = "format.json"
每一个操作都会发相应的通知,通知是存在一个map里cmd/notification.go
func sendEvent(args eventArgs)globalNotificationSys.Send(args)func (sys *NotificationSys) Send(args eventArgs)targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
internal/event/targetlist.go
type TargetList struct {sync.RWMutextargets map[TargetID]Target}
元文件的定义如下:cmd/fs-v1-metadata.go
type fsMetaV1 struct {Version string `json:"version"`// checksums of blocks on disk.Checksum FSChecksumInfoV1 `json:"checksum,omitempty"`// Metadata map for current object.Meta map[string]string `json:"meta,omitempty"`// parts info for current object - used in encryption.Parts []ObjectPartInfo `json:"parts,omitempty"`}
cmd/xl-storage-format-v1.go
type ObjectPartInfo struct {ETag string `json:"etag,omitempty"`Number int `json:"number"`Size int64 `json:"size"`ActualSize int64 `json:"actualSize"`}
cmd/fs-v1-helpers.go
func fsStatDir(ctx context.Context, statDir string) (os.FileInfo, error)fi, err := fsStat(ctx, statDir)
分片上传相对复杂,代码路径在:cmd/fs-v1-multipart.go
func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error)if _, err := fs.statBucketDir(ctx, bucket); err != nil {uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())defer fsRemoveFile(ctx, tmpPartPath)partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))err = fsSimpleRenameFile(ctx, tmpPartPath, partPath);go fs.backgroundAppend(ctx, bucket, object, uploadID)fi, err := fsStatFile(ctx, partPath)







