暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hudi Clean Policy 清理策略实现分析

伦少的博客 2022-05-29
1646

前言

总结Hudi Clean Policy清理策略,从源码层面分析如何实现,上一篇文章Hudi Clean 清理文件实现分析从源码层面分析总结了Hudi Clean的整体流程,但是对于和策略有关的获取要删除的文件列表部分没有深入分析,这一篇详细分析KEEP_LATEST_COMMITS
策略是如何实现的。

getEarliestCommitToRetain

获取最早需要保留的commit
KEEP_LATEST_FILE_VERSIONS
: 返回空
KEEP_LATEST_COMMITS
:如果目前完成状态的commit数小于等于hoodie.cleaner.commits.retained
,则返回空,否则,返回倒数第hoodie.cleaner.commits.retained
(默认10)个.commit

  /**
   * Returns earliest commit to retain based on cleaning policy.
   * 根据清理策略返回最早的需要保留的commit
   */

  public Option<HoodieInstant> getEarliestCommitToRetain() {
    Option<HoodieInstant> earliestCommitToRetain = Option.empty();
    // 获取最大保留commit次数,默认值10,配置参数为`hoodie.cleaner.commits.retained`
    int commitsRetained = config.getCleanerCommitsRetained();
    if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
        && commitTimeline.countInstants() > commitsRetained) {
      // 如果清理策略为KEEP_LATEST_COMMITS,且commit的总数大于commitsRetained
      // 那么最早需要保留的commit为最新的第10个commit,也就倒数第10个
      // commitTimeline.instants是按照时间从小到大排序的,具体实现在`HoodieTableMetaClient.scanHoodieInstantsFromFileSystem`
      // 假如commit为1、2、3...11,那么`commitTimeline.countInstants() - commitsRetained`等于1,
      // 也就是返回commitTimeline.instants.get(1),返回`commit 2`(下标从0开始),也就是倒数第10个
      // 当策略为KEEP_LATEST_FILE_VERSIONS,直接返回空
      earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
    }
    return earliestCommitToRetain;
  }

getPartitionPathsToClean

返回需要清理的分区路径

KEEP_LATEST_FILE_VERSIONS
:调用getPartitionPathsForFullCleaning
获取所有的分区路径
KEEP_LATEST_COMMITS
:当不存在.clean文件时,和KEEP_LATEST_FILE_VERSIONS
策略一样调用getPartitionPathsForFullCleaning
,当存在时调用getPartitionPathsForIncrementalCleaning获取自上一个clean到现在为止新增的需要清理的分区路径

  /**
   * Returns list of partitions where clean operations needs to be performed.
   * 返回清理操作需要执行的分区列表
   *
   * @param earliestRetainedInstant New instant to be retained after this cleanup operation
   * @return list of partitions to scan for cleaning
   * @throws IOException when underlying file-system throws this exception
   */

  public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
    switch (config.getCleanerPolicy()) {
      case KEEP_LATEST_COMMITS:
        return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
      case KEEP_LATEST_FILE_VERSIONS:
        return getPartitionPathsForFullCleaning();
      default:
        throw new IllegalStateException("Unknown Cleaner Policy");
    }
  }

getPartitionPathsForCleanByCommits

  /**
   * Return partition paths for cleaning by commits mode.
   * @param instantToRetain Earliest Instant to retain
   * @return list of partitions
   * @throws IOException
   */

  private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
    if (!instantToRetain.isPresent()) {
      LOG.info("No earliest commit to retain. No need to scan partitions !!");
      return Collections.emptyList();
    }

    if (config.incrementalCleanerModeEnabled()) { // 默认true
      // 最后一个.clean
      Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
      if (lastClean.isPresent()) {
        // 最后一个.clean存在,反序列生成`HoodieCleanMetadata`
        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
            .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
        if ((cleanMetadata.getEarliestCommitToRetain() != null)
            && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
          // 如果最后一个.clean存在earliestCommitToRetain
          // 如果上一次clean操作执行成功的话会将上一次的earliestCommitToRetain序列化到.clean文件中
          return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
        }
      }
    }
    // 当instantToRetain不为空且是第一次clean时,调用getPartitionPathsForFullCleaning获取分区路径
    // 这时和策略为KEEP_LATEST_FILE_VERSIONS时的逻辑一样,只不过KEEP_LATEST_FILE_VERSIONS不用instantToRetain非空
    return getPartitionPathsForFullCleaning();
  }

getPartitionPathsForFullCleaning

  /**
   * Scan and list all partitions for cleaning.
   * @return all partitions paths for the dataset.
   * @throws IOException
   */

  private List<String> getPartitionPathsForFullCleaning() {
    // Go to brute force mode of scanning all partitions
    return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
  }

  public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
                                                  String basePathStr)
 
{
    try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
        FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue())) {
      // 默认`FileSystemBackedTableMetadata.getAllPartitionPaths`
      return tableMetadata.getAllPartitionPaths();
    } catch (Exception e) {
      throw new HoodieException("Error fetching partition paths from metadata table", e);
    }
  } 

    @Override
  public List<String> getAllPartitionPaths() throws IOException {
    if (assumeDatePartitioning) { // 默认false
      FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
    }

    List<Path> pathsToList = new LinkedList<>();
    pathsToList.add(new Path(datasetBasePath));
    List<String> partitionPaths = new ArrayList<>();

    while (!pathsToList.isEmpty()) {
      // TODO: Get the parallelism from HoodieWriteConfig
      int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());

      // List all directories in parallel
      // 列出pathsToList里path的所有的一级文件及路径
      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
        FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
        return Pair.of(path, fileSystem.listStatus(path));
      }, listingParallelism);
      // 清空pathsToList
      pathsToList.clear();

      // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
      // the results.
      dirToFileListing.forEach(p -> {
        // 过滤查找分区路径`HOODIE_PARTITION_METAFILE`即.hoodie_partition_metadata
        Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
            .filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
            .findFirst());

        if (partitionMetaFile.isPresent()) {
          // 如果分区路径存在,那么代表当前遍历的路径是分区路径
          // 即p.getLeft(),通过`getRelativePartitionPath`将相对路径添加到partitionPaths列表中
          // 如果没有分区字段的话,当前路径也就是datasetBasePath就是分区路径,相对路径为空""
          // 如果有分区字段,相对路径为所有的 `partitionField1=partitionField1Val/partitionField2=partitionField2Val/...`
          // 这里假设是Hive格式的分区路径
          // Is a partition.
          String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft());
          partitionPaths.add(partitionName);
        } else {
          // 如果当前路径下没有分区路径,那么继续遍历子路径,直到找到分区路径,或者遍历完所有的子路径
          // 这里的子路径不包含`METAFOLDER_NAME`,即.hoodie
          // 如果是有分区字段的话,需要遍历完所有的分区路径
          // Add sub-dirs to the queue
          pathsToList.addAll(Arrays.stream(p.getRight())
              .filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
              .map(fs -> fs.getPath())
              .collect(Collectors.toList()));
        }
      });
    }
    // 返回分区路径列表
    return partitionPaths;
  } 

getPartitionPathsForIncrementalCleaning

用增量模式查找分区路径,过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain([lastEarliestCommitToRetain,thisEarliestCommitToRetain))的commit,这个区间的commit所涉及的分区即为要清理的分区,具体方法为replaceCommitMetadata.getPartitionToReplaceFileIds
commitMetadata.getPartitionToWriteStats()

  /**
   * Use Incremental Mode for finding partition paths.
   * 用增量模式查找分区路径
   *
   * @param cleanMetadata
   * @param newInstantToRetain
   * @return
   */

  private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
      Option<HoodieInstant> newInstantToRetain)
 
{
    LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
        + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
        + ". New Instant to retain : " + newInstantToRetain);
    // 过滤大于等于上次.clean保存的earliestCommitToRetain && 小于这次earliestCommitToRetain的commit ([lastEarliestCommitToRetain,thisEarliestCommitToRetain))
    // 这个区间的commit所涉及的分区即为要清理的分区
    return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
        instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
            cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
            HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
              try {
                if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
                  // 如果instant类型为`REPLACE_COMMIT_ACTION`,即`replacecommit`
                  // 反序列化为HoodieReplaceCommitMetadata,并返回replaceCommitMetadata涉及的所有分区路径
                  HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
                      hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
                  return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
                } else {
                  // 否则,反序列化为HoodieCommitMetadata,返回commitMetadata所涉及的所有分区路径
                  HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
                      .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
                          HoodieCommitMetadata.class);
                  return commitMetadata.getPartitionToWriteStats().keySet().stream();
                }
              } catch (IOException e) {
                throw new HoodieIOException(e.getMessage(), e);
              }
            }).distinct().collect(Collectors.toList());
  }

getDeletePaths

基于清理策略根据给出的分区路径返回需要被清理的文件列表
KEEP_LATEST_COMMITS
: 调用getFilesToCleanKeepingLatestCommits

KEEP_LATEST_FILE_VERSIONS
: 调用getFilesToCleanKeepingLatestVersions

  /**
   * Returns files to be cleaned for the given partitionPath based on cleaning policy.
   * 基于清理策略根据给出的分区路径返回需要被清理的文件列表
   */

  public List<CleanFileInfo> getDeletePaths(String partitionPath) {
    HoodieCleaningPolicy policy = config.getCleanerPolicy();
    List<CleanFileInfo> deletePaths;
    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
      // 当策略为`KEEP_LATEST_COMMITS`,调用`getFilesToCleanKeepingLatestCommits`
      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
    } else {
      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
    }
    LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);

    return deletePaths;
  }

getFilesToCleanKeepingLatestCommits

1、获取分区路径partitionPath
下所有的文件组fileGroups
(fileGroup
是指所有文件fileId相同的为一个文件组)
2、遍历fileGroups
,获取每个fileGroup
对应的所有的数据(parquet)文件fileSliceList

3、遍历fileSliceList
,判断每一个文件,首先过滤掉文件是savepointedFile
或者是最新版本或者是小于earliestCommitToRetain
的最近一次版本的文件,再将不需要为compaction
操作保留并且fileCommitTime < earliestCommitToRetain
的文件添加到返回的需要删除的文件列表中deletePaths
。也就是clean时不会清理savepointedFile
、最新版本和小于earliestCommitToRetain
的最近一次版本的文件,还有也不会清理不需要为compaction
操作保留的文件,其余的只要文件时间小于earliestCommitToRetain
都是我们要返回需要删除的文件

  private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
    // commit保留数,默认10
    int commitsRetained = config.getCleanerCommitsRetained();
    LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
    // 初始化需要删除的路径列表,类型为`CleanFileInfo`
    List<CleanFileInfo> deletePaths = new ArrayList<>();

    // Collect all the datafiles savepointed by all the savepoints
    // 所有的通过savepoints保存的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(this::getSavepointedDataFiles)
        .collect(Collectors.toList());

    // determine if we have enough commits, to start cleaning.
    if (commitTimeline.countInstants() > commitsRetained) { // commit总数大于commit保留数
      // 最早需要保留的commit
      Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
      HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
      // all replaced file groups before earliestCommitToRetain are eligible to clean
      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
      // add active files
      // 所有的fileGroup,简单说fileId一样的为一组
      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
      for (HoodieFileGroup fileGroup : fileGroups) { // 遍历fileGroups
        // 每个fileGroup对应的所有的数据(parquet)文件
        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());

        if (fileSliceList.isEmpty()) {
          continue;
        }
        // 该fileGroup中最新的版本
        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
        // 小于earliestCommitToRetain的最近一次版本,该文件版本可能依旧被用来查询
        String lastVersionBeforeEarliestCommitToRetain =
            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);

        // Ensure there are more than 1 version of the file (we only clean old files from updates)
        // i.e always spare the last commit.
        for (FileSlice aSlice : fileSliceList) {
          Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
          String fileCommitTime = aSlice.getBaseInstantTime();
          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
            // do not clean up a savepoint data file
            continue;
          }
          // Dont delete the latest commit and also the last commit before the earliest commit we
          // are retaining
          // The window of commit retain == max query run time. So a query could be running which
          // still
          // uses this file.
          if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
            // move on to the next file
            continue;
          }

          // Always keep the last commit
          // 如果不需要为`compaction`操作保留这个文件 && earliestCommitToRetain > fileCommitTime
          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
              .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
            // this is a commit, that should be cleaned.
            aFile.ifPresent(hoodieDataFile -> {
              // 将该文件添加到deletePaths
              deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
              if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
              }
            });
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
              // If merge on read, then clean the log files for the commits as well
              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
                  .collect(Collectors.toList()));
            }
          }
        }
      }
    }
    // 返回需要删除的文件列表
    return deletePaths;
  }

  /**
   * Gets the latest version < instantTime. This version file could still be used by queries.
   * 找到最近的小于instantTime的版本的文件,这个版本的文件可能依旧被用来查询
   *
   * instantTime:earliestCommitToRetain
   *
   * 对于比较早的最近没有被操作的fileGroup,获得的文件等于该fileGroup对应的最新版本的文件,因为最新版本的文件的
   * 时间小于earliestCommitToRetain
   *
   */

  private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
    // 遍历fileSliceList,fileSliceList内部有序(reverseOrder),文件按日期从大到小排序,
    for (FileSlice file : fileSliceList) {
      String fileCommitTime = file.getBaseInstantTime();
      if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
        // fileList is sorted on the reverse, so the first commit we find < instantTime is the
        // one we want
        // 直到instantTime > fileCommitTime, 返回fileCommitTime
        return fileCommitTime;
      }
    }
    // There is no version of this file which is < instantTime
    return null;
  }

  private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
    final Stream<HoodieFileGroup> replacedGroups;
    // 当策略为`KEEP_LATEST_COMMITS`,earliestCommitToRetain一定存在
    if (earliestCommitToRetain.isPresent()) {
      // fileSystemView 为`org.apache.hudi.common.table.view.PriorityBasedFileSystemView`
      replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
    } else {
      replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
    }
    return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
        // do not delete savepointed files  (archival will make sure corresponding replacecommit file is not deleted)
        .filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
        .flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
        .collect(Collectors.toList());
  }

  /**
   * 对于clean操作:
   *    maxCommitTime 最早需要保留的commit
   *    partitionPath 分区相对路径
   *    preferredView org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView
   *    secondaryView org.apache.hudi.common.table.view.HoodieTableFileSystemView
   */

  public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
    return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBefore, secondaryView::getReplacedFileGroupsBefore);
  }

  private <T1, T2, R> execute(T1 val, T2 val2, Function2<T1, T2, R> preferredFunction,
      Function2<T1, T2, R> secondaryFunction)
 
{
    if (errorOnPreferredView) { // 默认false
      LOG.warn("Routing request to secondary file-system view");
      return secondaryFunction.apply(val, val2);
    } else {
      try {
        /**
         * 对于getReplacedFileGroupsBefore:
         *    这里调用的是`preferredView::getReplacedFileGroupsBefore`
         *    即`RemoteHoodieTableFileSystemView.getReplacedFileGroupsBefore`
         */

        return preferredFunction.apply(val, val2);
      } catch (RuntimeException re) {
        LOG.error("Got error running preferred function. Trying secondary", re);
        errorOnPreferredView = true;
        return secondaryFunction.apply(val, val2);
      }
    }
  }

  @Override
  public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
    /**
     * HashMap
     *  "partition" -> "partitionPath"
     *  "maxinstant" -> "earliestCommitToRetain"
     *  "basepath" -> "basepath"
     */

    Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
    try {
      // 这里是通过HttpRequest.get api接口的方式实现的,接口为 `/v1/hoodie/view/filegroups/replaced/before/`
      // 接口是在类`RequestHandler`添加的,具体的实现在`registerFileSlicesAPI`方法中,这一块还不太懂
      // 这里(目前遇到的都是)返回的是空,以后再深入研究
      List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
          new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
    } catch (IOException e) {
      throw new HoodieRemoteException(e);
    }
  }

getFilesToCleanKeepingLatestVersions

1、遍历fileGroups
2、获取每个fileGroup
对应的所有的数据(parquet),过滤掉需要为compaction
操作保留的文件:fileSliceIterator

3、如果该fileGroup
PendingCompaction
中,保留版本数减一
4、遍历fileSliceIterator
,保留对应的版本数的最新文件,其余的都是要删除的文件,添加的返回列表中

  /**
   * Selects the older versions of files for cleaning, such that it bounds the number of versions of each file. This
   * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
   * single file (i.e run it with versionsRetained = 1)
   *
   * 选择需要清理的旧版本文件,以限制每个文件的版本数。
   * 如果你只对查询表感兴趣,并且不希望一个文件有太多版本(比如保留一个文件版本),那么策略很有用
   */

  private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
    LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
        + " file versions. ");
    // 初始化需要删除的路径列表,类型为`CleanFileInfo`
    List<CleanFileInfo> deletePaths = new ArrayList<>();
    // Collect all the datafiles savepointed by all the savepoints
    // 所有的通过savepoints保存的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(this::getSavepointedDataFiles)
        .collect(Collectors.toList());

    // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
    // In other words, the file versions only apply to the active file groups.
    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
    // 所有的fileGroup,简单说fileId一样的为一组
    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
    for (HoodieFileGroup fileGroup : fileGroups) { // 遍历fileGroups
      // 需要保留的文件版本数,通过`hoodie.cleaner.fileversions.retained`配置,默认3
      int keepVersions = config.getCleanerFileVersionsRetained();
      // do not cleanup slice required for pending compaction
      // 每个fileGroup对应的所有的数据(parquet),过滤掉需要为`compaction`操作保留的文件
      Iterator<FileSlice> fileSliceIterator =
          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
      if (isFileGroupInPendingCompaction(fileGroup)) { // 如果fileGroup在PendingCompaction中
        // We have already saved the last version of file-groups for pending compaction Id
        // 版本数减一,因为我们已经将最新的版本保存在`pending compaction`中了
        keepVersions--;
      }

      // 遍历需要保留的版本文件
      while (fileSliceIterator.hasNext() && keepVersions > 0) { // 当有历史版本且keepVersions>0
        // Skip this most recent version
        // 取下一个版本的文件
        FileSlice nextSlice = fileSliceIterator.next();
        Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
          // do not clean up a savepoint data file
          // 不清理`savepoint data file`
          continue;
        }
        // 版本数减一
        keepVersions--;
      }

      // Delete the remaining files
      // 剩下的文件都是需要删除的文件
      while (fileSliceIterator.hasNext()) {
        FileSlice nextSlice = fileSliceIterator.next();
        deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
      }
    }
    return deletePaths;
  }

总结

相同点

不管是哪种策略,都是先遍历fileGroups
,再遍历fileGroup
每个对应的FileSlices
,每种策略都不删除savepoints
PendingCompaction
文件,都保留文件的最新版本

区别

KEEP_LATEST_COMMITS
: 保留最新的commit数,除了上述相同点讲的保留的文件,还多保留一个commit,即通过方法getLatestVersionBeforeCommit
获取的小于earliestCommitToRetain的最近一次版本,但我觉得没有必要保留这个文件,我已经提了PR: https://github.com/apache/hudi/pull/5406,不确定我认为的对不对
对于比较早没有新写入的fileGroup
,只保留一个最新版本,因为因为最新版本的文件的时间小于earliestCommitToRetain,那么getLatestVersionBeforeCommit
返回的文件就是最新版本的文件

KEEP_LATEST_FILE_VERSIONS
: 每个fileGroup
都保留相同的版本数


文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论