前言
总结Hudi Clean Policy清理策略,从源码层面分析如何实现,上一篇文章Hudi Clean 清理文件实现分析从源码层面分析总结了Hudi Clean的整体流程,但是对于和策略有关的获取要删除的文件列表部分没有深入分析,这一篇详细分析KEEP_LATEST_COMMITS
策略是如何实现的。
getEarliestCommitToRetain
获取最早需要保留的commitKEEP_LATEST_FILE_VERSIONS
: 返回空KEEP_LATEST_COMMITS
:如果目前完成状态的commit数小于等于hoodie.cleaner.commits.retained
,则返回空,否则,返回倒数第hoodie.cleaner.commits.retained
(默认10)个.commit
/**
* Returns earliest commit to retain based on cleaning policy.
* 根据清理策略返回最早的需要保留的commit
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
Option<HoodieInstant> earliestCommitToRetain = Option.empty();
// 获取最大保留commit次数,默认值10,配置参数为`hoodie.cleaner.commits.retained`
int commitsRetained = config.getCleanerCommitsRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
// 如果清理策略为KEEP_LATEST_COMMITS,且commit的总数大于commitsRetained
// 那么最早需要保留的commit为最新的第10个commit,也就倒数第10个
// commitTimeline.instants是按照时间从小到大排序的,具体实现在`HoodieTableMetaClient.scanHoodieInstantsFromFileSystem`
// 假如commit为1、2、3...11,那么`commitTimeline.countInstants() - commitsRetained`等于1,
// 也就是返回commitTimeline.instants.get(1),返回`commit 2`(下标从0开始),也就是倒数第10个
// 当策略为KEEP_LATEST_FILE_VERSIONS,直接返回空
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
}
return earliestCommitToRetain;
}
getPartitionPathsToClean
返回需要清理的分区路径
KEEP_LATEST_FILE_VERSIONS
:调用getPartitionPathsForFullCleaning
获取所有的分区路径KEEP_LATEST_COMMITS
:当不存在.clean文件时,和KEEP_LATEST_FILE_VERSIONS
策略一样调用getPartitionPathsForFullCleaning
,当存在时调用getPartitionPathsForIncrementalCleaning获取自上一个clean到现在为止新增的需要清理的分区路径
/**
* Returns list of partitions where clean operations needs to be performed.
* 返回清理操作需要执行的分区列表
*
* @param earliestRetainedInstant New instant to be retained after this cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForFullCleaning();
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
}
getPartitionPathsForCleanByCommits
/**
* Return partition paths for cleaning by commits mode.
* @param instantToRetain Earliest Instant to retain
* @return list of partitions
* @throws IOException
*/
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}
if (config.incrementalCleanerModeEnabled()) { // 默认true
// 最后一个.clean
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
// 最后一个.clean存在,反序列生成`HoodieCleanMetadata`
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
// 如果最后一个.clean存在earliestCommitToRetain
// 如果上一次clean操作执行成功的话会将上一次的earliestCommitToRetain序列化到.clean文件中
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
}
}
}
// 当instantToRetain不为空且是第一次clean时,调用getPartitionPathsForFullCleaning获取分区路径
// 这时和策略为KEEP_LATEST_FILE_VERSIONS时的逻辑一样,只不过KEEP_LATEST_FILE_VERSIONS不用instantToRetain非空
return getPartitionPathsForFullCleaning();
}
getPartitionPathsForFullCleaning
/**
* Scan and list all partitions for cleaning.
* @return all partitions paths for the dataset.
* @throws IOException
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
}
public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String basePathStr) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue())) {
// 默认`FileSystemBackedTableMetadata.getAllPartitionPaths`
return tableMetadata.getAllPartitionPaths();
} catch (Exception e) {
throw new HoodieException("Error fetching partition paths from metadata table", e);
}
}
@Override
public List<String> getAllPartitionPaths() throws IOException {
if (assumeDatePartitioning) { // 默认false
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
}
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(datasetBasePath));
List<String> partitionPaths = new ArrayList<>();
while (!pathsToList.isEmpty()) {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel
// 列出pathsToList里path的所有的一级文件及路径
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return Pair.of(path, fileSystem.listStatus(path));
}, listingParallelism);
// 清空pathsToList
pathsToList.clear();
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
// the results.
dirToFileListing.forEach(p -> {
// 过滤查找分区路径`HOODIE_PARTITION_METAFILE`即.hoodie_partition_metadata
Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
.filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
.findFirst());
if (partitionMetaFile.isPresent()) {
// 如果分区路径存在,那么代表当前遍历的路径是分区路径
// 即p.getLeft(),通过`getRelativePartitionPath`将相对路径添加到partitionPaths列表中
// 如果没有分区字段的话,当前路径也就是datasetBasePath就是分区路径,相对路径为空""
// 如果有分区字段,相对路径为所有的 `partitionField1=partitionField1Val/partitionField2=partitionField2Val/...`
// 这里假设是Hive格式的分区路径
// Is a partition.
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft());
partitionPaths.add(partitionName);
} else {
// 如果当前路径下没有分区路径,那么继续遍历子路径,直到找到分区路径,或者遍历完所有的子路径
// 这里的子路径不包含`METAFOLDER_NAME`,即.hoodie
// 如果是有分区字段的话,需要遍历完所有的分区路径
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fs -> fs.getPath())
.collect(Collectors.toList()));
}
});
}
// 返回分区路径列表
return partitionPaths;
}
getPartitionPathsForIncrementalCleaning
用增量模式查找分区路径,过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain([lastEarliestCommitToRetain,thisEarliestCommitToRetain))的commit,这个区间的commit所涉及的分区即为要清理的分区,具体方法为
replaceCommitMetadata.getPartitionToReplaceFileIds
和commitMetadata.getPartitionToWriteStats()
/**
* Use Incremental Mode for finding partition paths.
* 用增量模式查找分区路径
*
* @param cleanMetadata
* @param newInstantToRetain
* @return
*/
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
// 过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain的commit ([lastEarliestCommitToRetain,thisEarliestCommitToRetain))
// 这个区间的commit所涉及的分区即为要清理的分区
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// 如果instant类型为`REPLACE_COMMIT_ACTION`,即`replacecommit`
// 反序列化为HoodieReplaceCommitMetadata,并返回replaceCommitMetadata涉及的所有分区路径
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
// 否则,反序列化为HoodieCommitMetadata,返回commitMetadata所涉及的所有分区路径
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
}
getDeletePaths
基于清理策略根据给出的分区路径返回需要被清理的文件列表
KEEP_LATEST_COMMITS
: 调用getFilesToCleanKeepingLatestCommitsKEEP_LATEST_FILE_VERSIONS
: 调用getFilesToCleanKeepingLatestVersions
/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
* 基于清理策略根据给出的分区路径返回需要被清理的文件列表
*/
public List<CleanFileInfo> getDeletePaths(String partitionPath) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
List<CleanFileInfo> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
// 当策略为`KEEP_LATEST_COMMITS`,调用`getFilesToCleanKeepingLatestCommits`
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
return deletePaths;
}
getFilesToCleanKeepingLatestCommits
1、获取分区路径
partitionPath
下所有的文件组fileGroups
(fileGroup
是指所有文件fileId相同的为一个文件组)
2、遍历fileGroups
,获取每个fileGroup
对应的所有的数据(parquet)文件fileSliceList
3、遍历fileSliceList
,判断每一个文件,首先过滤掉文件是savepointedFile
或者是最新版本或者是小于earliestCommitToRetain
的最近一次版本的文件,再将不需要为compaction
操作保留并且fileCommitTime < earliestCommitToRetain
的文件添加到返回的需要删除的文件列表中deletePaths
。也就是clean时不会清理savepointedFile
、最新版本和小于earliestCommitToRetain
的最近一次版本的文件,还有也不会清理不需要为compaction
操作保留的文件,其余的只要文件时间小于earliestCommitToRetain
都是我们要返回需要删除的文件
private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
// commit保留数,默认10
int commitsRetained = config.getCleanerCommitsRetained();
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
// 初始化需要删除的路径列表,类型为`CleanFileInfo`
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
// 所有的通过savepoints保存的文件
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) { // commit总数大于commit保留数
// 最早需要保留的commit
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
// add active files
// 所有的fileGroup,简单说fileId一样的为一组
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) { // 遍历fileGroups
// 每个fileGroup对应的所有的数据(parquet)文件
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileSliceList.isEmpty()) {
continue;
}
// 该fileGroup中最新的版本
String lastVersion = fileSliceList.get(0).getBaseInstantTime();
// 小于earliestCommitToRetain的最近一次版本,该文件版本可能依旧被用来查询
String lastVersionBeforeEarliestCommitToRetain =
getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
// Ensure there are more than 1 version of the file (we only clean old files from updates)
// i.e always spare the last commit.
for (FileSlice aSlice : fileSliceList) {
Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
String fileCommitTime = aSlice.getBaseInstantTime();
if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
// Dont delete the latest commit and also the last commit before the earliest commit we
// are retaining
// The window of commit retain == max query run time. So a query could be running which
// still
// uses this file.
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
// move on to the next file
continue;
}
// Always keep the last commit
// 如果不需要为`compaction`操作保留这个文件 && earliestCommitToRetain > fileCommitTime
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> {
// 将该文件添加到deletePaths
deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
}
});
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
}
}
}
}
// 返回需要删除的文件列表
return deletePaths;
}
/**
* Gets the latest version < instantTime. This version file could still be used by queries.
* 找到最近的小于instantTime的版本的文件,这个版本的文件可能依旧被用来查询
*
* instantTime:earliestCommitToRetain
*
* 对于比较早的最近没有被操作的fileGroup,获得的文件等于该fileGroup对应的最新版本的文件,因为最新版本的文件的
* 时间小于earliestCommitToRetain
*
*/
private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
// 遍历fileSliceList,fileSliceList内部有序(reverseOrder),文件按日期从大到小排序,
for (FileSlice file : fileSliceList) {
String fileCommitTime = file.getBaseInstantTime();
if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// fileList is sorted on the reverse, so the first commit we find < instantTime is the
// one we want
// 直到instantTime > fileCommitTime, 返回fileCommitTime
return fileCommitTime;
}
}
// There is no version of this file which is < instantTime
return null;
}
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
// 当策略为`KEEP_LATEST_COMMITS`,earliestCommitToRetain一定存在
if (earliestCommitToRetain.isPresent()) {
// fileSystemView 为`org.apache.hudi.common.table.view.PriorityBasedFileSystemView`
replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
} else {
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted)
.filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
.flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
.collect(Collectors.toList());
}
/**
* 对于clean操作:
* maxCommitTime 最早需要保留的commit
* partitionPath 分区相对路径
* preferredView org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView
* secondaryView org.apache.hudi.common.table.view.HoodieTableFileSystemView
*/
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBefore, secondaryView::getReplacedFileGroupsBefore);
}
private <T1, T2, R> R execute(T1 val, T2 val2, Function2<T1, T2, R> preferredFunction,
Function2<T1, T2, R> secondaryFunction) {
if (errorOnPreferredView) { // 默认false
LOG.warn("Routing request to secondary file-system view");
return secondaryFunction.apply(val, val2);
} else {
try {
/**
* 对于getReplacedFileGroupsBefore:
* 这里调用的是`preferredView::getReplacedFileGroupsBefore`
* 即`RemoteHoodieTableFileSystemView.getReplacedFileGroupsBefore`
*/
return preferredFunction.apply(val, val2);
} catch (RuntimeException re) {
LOG.error("Got error running preferred function. Trying secondary", re);
errorOnPreferredView = true;
return secondaryFunction.apply(val, val2);
}
}
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
/**
* HashMap
* "partition" -> "partitionPath"
* "maxinstant" -> "earliestCommitToRetain"
* "basepath" -> "basepath"
*/
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
try {
// 这里是通过HttpRequest.get api接口的方式实现的,接口为 `/v1/hoodie/view/filegroups/replaced/before/`
// 接口是在类`RequestHandler`添加的,具体的实现在`registerFileSlicesAPI`方法中,这一块还不太懂
// 这里(目前遇到的都是)返回的是空,以后再深入研究
List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}
getFilesToCleanKeepingLatestVersions
1、遍历fileGroups
2、获取每个fileGroup
对应的所有的数据(parquet),过滤掉需要为compaction
操作保留的文件:fileSliceIterator
3、如果该fileGroup
在PendingCompaction
中,保留版本数减一
4、遍历fileSliceIterator
,保留对应的版本数的最新文件,其余的都是要删除的文件,添加的返回列表中
/**
* Selects the older versions of files for cleaning, such that it bounds the number of versions of each file. This
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
* single file (i.e run it with versionsRetained = 1)
*
* 选择需要清理的旧版本文件,以限制每个文件的版本数。
* 如果你只对查询表感兴趣,并且不希望一个文件有太多版本(比如保留一个文件版本),那么策略很有用
*/
private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
// 初始化需要删除的路径列表,类型为`CleanFileInfo`
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
// 所有的通过savepoints保存的文件
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
// 所有的fileGroup,简单说fileId一样的为一组
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) { // 遍历fileGroups
// 需要保留的文件版本数,通过`hoodie.cleaner.fileversions.retained`配置,默认3
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
// 每个fileGroup对应的所有的数据(parquet),过滤掉需要为`compaction`操作保留的文件
Iterator<FileSlice> fileSliceIterator =
fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
if (isFileGroupInPendingCompaction(fileGroup)) { // 如果fileGroup在PendingCompaction中
// We have already saved the last version of file-groups for pending compaction Id
// 版本数减一,因为我们已经将最新的版本保存在`pending compaction`中了
keepVersions--;
}
// 遍历需要保留的版本文件
while (fileSliceIterator.hasNext() && keepVersions > 0) { // 当有历史版本且keepVersions>0
// Skip this most recent version
// 取下一个版本的文件
FileSlice nextSlice = fileSliceIterator.next();
Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// do not clean up a savepoint data file
// 不清理`savepoint data file`
continue;
}
// 版本数减一
keepVersions--;
}
// Delete the remaining files
// 剩下的文件都是需要删除的文件
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
return deletePaths;
}
总结
相同点
不管是哪种策略,都是先遍历fileGroups
,再遍历fileGroup
每个对应的FileSlices
,每种策略都不删除savepoints
和PendingCompaction
文件,都保留文件的最新版本
区别
KEEP_LATEST_COMMITS
: 保留最新的commit数,除了上述相同点讲的保留的文件,还多保留一个commit,即通过方法getLatestVersionBeforeCommit
获取的小于earliestCommitToRetain的最近一次版本,但我觉得没有必要保留这个文件,我已经提了PR: https://github.com/apache/hudi/pull/5406,不确定我认为的对不对
对于比较早没有新写入的fileGroup
,只保留一个最新版本,因为因为最新版本的文件的时间小于earliestCommitToRetain,那么getLatestVersionBeforeCommit
返回的文件就是最新版本的文件
KEEP_LATEST_FILE_VERSIONS
: 每个fileGroup
都保留相同的版本数




