CurveFS Client Warmup 介绍
Curve 是云原生计算基金会 (CNCF) Sandbox 项目,是网易主导自研和开源的高性能、易运维、云原生的分布式存储系统。
添加 Warmup 任务;首先必须要实现 Warmup 任务的添加。预热任务分为:单预热任务和多预热任务。单预热任务指的是预热单文件或单目录;多预热指的是根据制定的文件内容预热一系列的文件。
查找 Warmup 任务;查找预热任务,给出指定预热任务的进度。
停止 Warmup 任务;顾名思义,停止指定的预热任务。
列出 Warnup 任务;列出所有的预热任务。
Curve 预热流程分析
目前 CurveFS Client 已经实现 Warmup 功能,如果要实现 WarmupManager 的功能需要分析目前实现的 Warmup 的流程。
单个文件的预热与解析预热列表文件后预热列表文件中的单文件流程一样,因此在这里仅分析预热列表文件的预热流程
列表文件预热流程
switch (curvefs::client::common::GetWarmupType(type)) {case curvefs::client::common::WarmupType::kWarmupTypeList:g_ClientInstance->PutWarmTask(path);break;...}
std::unique_lock<std::mutex> lck(warmUpTaskMtx_);warmUpTasks_.push_back(warmUpTask);WarmUpRun();
while (hasWarmTask()) {std::string warmUpTask;GetwarmTask(&warmUpTask);...Dentry dentry;CURVEFS_ERROR ret = dentryManager_->GetDentry(fsInfo_->rootinodeid(), pToken, &dentry);...fuse_ino_t ino = dentry.inodeid();std::shared_ptr<InodeWrapper> inodeWrapper;ret = inodeManager_->GetInode(ino, inodeWrapper);...uint64_t len = inodeWrapper->GetLength();WarmUpFileContext_t warmUpFile{ino, len, true};SetWarmUpFile(warmUpFile);}
void FuseS3Client::BackGroundFetch() {while (!bgFetchStop_.load(std::memory_order_acquire)) {if (hasWarmUpTask()) { // new warmup taskWarmUpFileContext_t warmUpFile;GetWarmUpFile(&warmUpFile);std::vector<std::string> warmUpFilelist;GetWarmUpFileList(warmUpFile, warmUpFilelist);for (auto filePath : warmUpFilelist) {FetchDentryEnqueue(filePath);}}{ // file need warmupstd::list<fuse_ino_t> readAheadFiles;readAheadFiles.swap(GetReadAheadFiles());for (auto iter : readAheadFiles) {fetchDataEnqueue(iter);}}usleep(WARMUP_CHECKINTERVAL_US);}return;}
根(/):FetchChildDentryEnqueue
根目录的子目录 FetchDentry
其他 FetchDentry
if (splitPath.size() == 1 && isRoot) {FetchChildDentryEnqueue(fsInfo_->rootinodeid());return;} else if (splitPath.size() == 1) {this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);return;} else if (splitPath.size() > 1) { // travel pathsplitPath.pop_back();fuse_ino_t ino = fsInfo_->rootinodeid();for (auto iter : splitPath) {Dentry dentry;std::string pathName = iter;CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, pathName, &dentry);if (ret != CURVEFS_ERROR::OK) {...return;}ino = dentry.inodeid();}this->FetchDentry(ino, lastName);return;}
目录,执行 FetchChildDentryEnqueue
S3 文件,将 inodeid 添加到 readAheadFiles_
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);...if (FsFileType::TYPE_S3 == dentry.type()) {std::unique_lock<std::mutex> lck(fetchMtx_);readAheadFiles_.push_front(dentry.inodeid());return;} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {FetchChildDentryEnqueue(dentry.inodeid());return;}
目录 执行 FetchChildDentryEnqueue
S3 将 inodeid 添加到 readAheadFiles_
CURVEFS_ERROR ret = dentryManager_->ListDentry(ino, &dentryList, limit);...for (auto iter : dentryList) {if (FsFileType::TYPE_S3 == iter.type()) {std::unique_lock<std::mutex> lck(fetchMtx_);readAheadFiles_.push_front(iter.inodeid());} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {FetchChildDentryEnqueue(iter.inodeid());}...}
std::list<fuse_ino_t> readAheadFiles;readAheadFiles.swap(GetReadAheadFiles());for (auto iter : readAheadFiles) {fetchDataEnqueue(iter);}
auto task = [this, ino]() {std::shared_ptr<InodeWrapper> inodeWrapper;CURVEFS_ERROR ret = inodeManager_->GetInode(ino, inodeWrapper);...google::protobuf::Map<uint64_t, S3ChunkInfoList> *s3ChunkInfoMap= nullptr;{::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();s3ChunkInfoMap = inodeWrapper->GetChunkInfoMap();}if (nullptr == s3ChunkInfoMap ||s3ChunkInfoMap->empty()) {return;}travelChunks(ino, s3ChunkInfoMap);};GetTaskFetchPool().Enqueue(task);
for (auto &iter : *s3ChunkInfoMap) {travelChunk(ino, iter.second, &prefetchObjs);}WarmUpAllObjs(prefetchObjs);
GetObjectAsyncCallBack cb =[&](const S3Adapter *adapter,const std::shared_ptr<GetObjectAsyncContext> &context) {if (context->retCode == 0) {int ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect(context->key, context->buf, context->len);...if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) {cond.Signal();}delete []context->buf;return;}s3Adaptor_->GetS3Client()->DownloadAsync(context);};for (auto iter : prefetchObjs) {std::string name = iter.first;uint64_t readLen = iter.second;if (s3Adaptor_->GetDiskCacheManager()->IsCached(name)) {pendingReq.fetch_sub(1);continue;}char *cacheS3 = new char[readLen];memset(cacheS3, 0, readLen);auto context = std::make_shared<GetObjectAsyncContext>();...s3Adaptor_->GetS3Client()->DownloadAsync(context);}
Curve Warmup Manager
新增数据结构
class WarmupFile { // 预热文件fuse_ino_t key_;uint64_t fileLen_;};using warmupFilelist = WarmupFile; // 预热列表文件。即文件中保存的是要预热的文件列表class WarmupInodes {fuse_ino_t key_;std::set<fuse_ino_t> readAheadFiles_;};class WarmupObjs {fuse_ino_t key;std::list<std::pair<std::string, uint64_t>> prefetchObjs;};class WarmupProcess {uint64_t total;uint64_t done;}class WarmupManager {// warmup progressstd::unordered_map<fuse_ino_t, std::shared_ptr<WarmupProgress>>inode2Progress_;BthreadRWLock inode2ProgressMutex_;std::deque<WarmupFilelist> warmupFilelistDeque_;mutable RWLock warmupFilelistDequeMutex_;std::unordered_map<fuse_ino_t, std::unique_ptr<ThreadPool>>inode2FetchDentryPool_;BthreadRWLock inode2FetchDentryPoolMutex_;std::deque<WarmupInodes> warmupInodesDeque_;mutable RWLock warmupInodesDequeMutex_;std::unordered_map<fuse_ino_t, std::unique_ptr<ThreadPool>>inode2FetchS3ObjectsPool_;BthreadRWLock inode2FetchS3ObjectsPoolMutex_;};
整体方案

if (AddWarmupProcess(key)) {WriteLockGuard lock(warmupFilelistDequeMutex_);auto iter = FindKeyWarmupFilelistLocked(key);if (iter == warmupFilelistDeque_.end()) {std::shared_ptr<InodeWrapper> inodeWrapper;CURVEFS_ERROR ret = inodeManager_->GetInode(key, inodeWrapper);if (ret != CURVEFS_ERROR::OK) {return false;}uint64_t len = inodeWrapper->GetLength();warmupFilelistDeque_.emplace_back(key, len);}}
void WarmupManagerS3Impl::ScanWarmupFilelist() {WriteLockGuard lock(warmupFilelistDequeMutex_);if (!warmupFilelistDeque_.empty()) {WarmupFilelist warmupFilelist = warmupFilelistDeque_.front();std::vector<std::string> warmuplist;GetWarmupList(warmupFilelist, &warmuplist);for (auto filePath : warmuplist) {FetchDentryEnqueue(warmupFilelist.GetKey(), filePath);}warmupFilelistDeque_.pop_front();}}void WarmupManagerS3Impl::FetchDentryEnqueue(fuse_ino_t key,const std::string &file) {auto task = [this, key, file]() { LookPath(key, file); };AddFetchDentryTask(key, task);}
if (FsFileType::TYPE_S3 == dentry.type()) {WriteLockGuard lock(warmupInodesDequeMutex_);auto iterDeque = FindKeyWarmupInodesLocked(key);if (iterDeque == warmupInodesDeque_.end()) {warmupInodesDeque_.emplace_back(key, std::set<fuse_ino_t>{dentry.inodeid()});} else {iterDeque->AddFileInode(dentry.inodeid());}return;}
然后扫描线程(BackGroundFetch)中的 ScanWarmupFiles 会扫描预热任务队列,一旦发现有要预热文件的 inode,使用 FetchDataEnqueue 来解析需要预热文件的 S3 对象。后面预热的流程和解析 dentry 类似,不再赘述。欢迎入群一起交流存储在 AI 场景的最佳实践。

<作者:程义, CurveAdm maintainer, CurveFS Committer>
🔥 火爆报名中:


关于 Curve
Curve 亦可作为云存储中间件使用 S3 兼容的对象存储作为数据存储引擎,为公有云用户提供高性价比的共享文件存储。
GitHub:https://github.com/opencurve/curve 官网:https://opencurve.io/ 用户论坛:https://ask.opencurve.io/ 微信群:搜索群助手微信号 OpenCurve_bot




