暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hudi Clean 清理文件实现分析

伦少的博客 2022-05-20
1540

前言

源码层面总结分析Hudi Clean是如何实现的,不了解Hudi Clean的可以先看这篇:一文彻底理解Apache Hudi的清理服务
Hudi Clean主要是清理删除不需要的历史文件,可以根据实际业务需要配置参数,不能影响查询,比如某个查询语句正在用某个文件,Clean如果删除了这个文件,查询就会报错。
这里只是删除历史文件,Hudi的文件是有多个版本的,不管配置什么参数,使用什么策略,都不会删除当前最新版本的文件。
Hudi 0.9.0版本有两种清理策略KEEP_LATEST_COMMITS
KEEP_LATEST_FILE_VERSIONS
,默认为KEEP_LATEST_COMMITS

KEEP_LATEST_COMMITS:简单讲就是根据commit的次数,默认保留最新的10个commit的所有文件,对于10个之前的文件只保留最新版本的文件,历史文件全部删除
KEEP_LATEST_FILE_VERSIONS:简单讲就是保留文件的版本数,默认保留三个版本
具体的可以看上面的那篇公众号文章

目前最新版本0.11.0 添加了一个新的策略KEEP_LATEST_BY_HOURS
:根据小时数清理,默认保留最近24小时的文件,具体实现请查看PR:[HUDI-349] Added new cleaning policy based on number of hours

本文以Hudi 0.9.0 Java Client COW表 进行分析

Insert

HoodieJavaWriteClient->postWrite->postCommit->autoCleanOnCommit

以Insert为入口进行代码跟踪,Hudi源码里有java客户端的代码示例,这里只贴部分主要代码

1writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
2String newCommitTime = writeClient.startCommit();
3writeClient.insert(records, newCommitTime);

HoodieJavaWriteClient.insert

 1  public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
2    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
3        getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
4    table.validateUpsertSchema();
5    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
6    HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
7    if (result.getIndexLookupDuration().isPresent()) {
8      metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
9    }
10    return postWrite(result, instantTime, table);
11  }

在执行完table.insert写完数据后会执行postWrite方法

 1  @Override
2  protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
3                                        String instantTime,
4                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable)
 
{
5    if (result.getIndexLookupDuration().isPresent()) {
6      metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
7    }
8    if (result.isCommitted()) {
9      // Perform post commit operations.
10      if (result.getFinalizeDuration().isPresent()) {
11        metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
12            result.getWriteStats().get().size());
13      }
14
15      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
16
17      emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
18    }
19    return result.getWriteStatuses();
20  }

postWrite方法里又会执行父类 AbstractHoodieWriteClient.postCommit
方法

 1  protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
2    try {
3      // Delete the marker directory for the instant.
4      WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
5          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
6      // We cannot have unbounded commit files. Archive commits if we have to archive
7      HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
8      archiveLog.archiveIfRequired(context);
9      // commit期间执行自动清理
10      autoCleanOnCommit();
11      if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
12        syncTableMetadata();
13      }
14    } catch (IOException ioe) {
15      throw new HoodieIOException(ioe.getMessage(), ioe);
16    } finally {
17      this.heartbeatClient.stop(instantTime);
18    }
19  }

postCommit方法里会调用autoCleanOnCommit()执行清理文件

AbstractHoodieWriteClient.autoCleanOnCommit

autoCleanOnCommit->clean->scheduleTableServiceInternal->HoodieJavaCopyOnWriteTable.clean

首先调用scheduleTableServiceInternal
,该方法会根据清理策略配置参数获取最早的需要保留的instant(earliestInstant
),然后获取需要清理的分区路径列表(partitionsToClean
),再根据分区路径获取需要删除的文件列表,最后将这些信息封装成HoodieCleanerPlan
序列化到新创建的
.clean.requested文件中
再执行HoodieJavaCopyOnWriteTable.clean
,该方法首先获取刚才创建的.clean.requested
文件和其他的之前失败的(如果有的话).clean.inflight
,然后反序列化刚才保存的.clean.requested的文件内容为HoodieCleanerPlan
,然后通过deleteFilesFunc
方法依次删除HoodieCleanerPlan
里的要删除的文件列表并返回HoodieCleanStat
,最后将HoodieCleanStat
作为参数构建HoodieCleanMetadata
,然后将HoodieCleanMetadata
序列化保存到新创建的.clean
文件中,这样整个clean操作就基本完成了。

如何根据清理策略获取要被清理的文件列表,请看后面的部分:获取要删除的文件列表

 1  /**
2   * Handle auto clean during commit.
3   *
4   */

5  protected void autoCleanOnCommit() {
6    if (config.isAutoClean()) { // 默认true
7      // Call clean to cleanup if there is anything to cleanup after the commit,
8      if (config.isAsyncClean()) { // 默认false
9        LOG.info("Cleaner has been spawned already. Waiting for it to finish");
10        AsyncCleanerService.waitForCompletion(asyncCleanerService);
11        LOG.info("Cleaner has finished");
12      } else {
13        // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
14        LOG.info("Auto cleaning is enabled. Running cleaner now");
15        // 执行clean操作
16        clean();
17      }
18    }
19  }
20   public HoodieCleanMetadata clean() {
21    return clean(HoodieActiveTimeline.createNewInstantTime());
22  }
23
24   public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
25    return clean(cleanInstantTime, true);
26  }

前面的只是调用链,下面才到了真正的逻辑

 1  /**
2   * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
3   * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
4   * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
5   * {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
6   * of clean.
7   */

8  public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
9    if (scheduleInline) {
10      // 主要逻辑为:创建.clean.requested
11      // .clean.requested内容为序列化后的(包含了要删除的文件列表等信息的)cleanerPlan
12      scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
13    }
14    LOG.info("Cleaner started");
15    final Timer.Context timerContext = metrics.getCleanCtx();
16    LOG.info("Cleaned failed attempts if any");
17    // 判断是否执行rollback,默认策略为EAGER,clean期间不执行rollback
18    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
19        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
20    // 执行table.clean,删除需要删除的文件,转化.clean.requested=>.clean.inflight=>.clean,返回HoodieCleanMetadata
21    // 这里为HoodieJavaCopyOnWriteTable.clean
22    HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
23    if (timerContext != null && metadata != null) {
24      long durationMs = metrics.getDurationInMs(timerContext.stop());
25      metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
26      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
27          + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
28          + " cleanerElapsedMs" + durationMs);
29    }
30    // 返回metadata
31    return metadata;
32  }

scheduleTableServiceInternal

 1  private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
2                                                      TableServiceType tableServiceType)
 
{
3    switch (tableServiceType) {
4      case CLUSTER:
5        LOG.info("Scheduling clustering at instant time :" + instantTime);
6        Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
7            .scheduleClustering(context, instantTime, extraMetadata);
8        return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
9      case COMPACT:
10        LOG.info("Scheduling compaction at instant time :" + instantTime);
11        Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
12            .scheduleCompaction(context, instantTime, extraMetadata);
13        return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
14      case CLEAN:
15        LOG.info("Scheduling cleaning at instant time :" + instantTime);
16        // 这里在子类`HoodieJavaCopyOnWriteTable.scheduleCleaning`实现
17        Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
18            .scheduleCleaning(context, instantTime, extraMetadata);
19        return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
20      default:
21        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
22    }
23  }

HoodieJavaCopyOnWriteTable.scheduleCleaning

1  public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
2    return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
3  }
4
5  public Option<HoodieCleanerPlan> execute() {
6    // Plan a new clean action
7    return requestClean(instantTime);
8  }  

 1  /**
2   * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
3   * // 如果有需要被清理的文件,创建一个cleanerPlan,并且将它们保存到instant文件中
4   * Cleaner Plan contains absolute file paths.
5   * cleanerPlan 包含文件的绝对路径
6   *
7   * @param startCleanTime Cleaner Instant Time
8   * @return Cleaner Plan if generated
9   */

10  protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
11    // cleanerPlan包含需要被清理的文件列表
12    final HoodieCleanerPlan cleanerPlan = requestClean(context);
13    if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
14        && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
15        && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
16      // 如果要删除的文件列表不为空
17      // Only create cleaner plan which does some work
18      // 创建.clean.requested
19      final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
20      // Save to both aux and timeline folder
21      try {
22        // 保存.clean.requested,.clean.requested文件里包含了序列化的cleanerPlan,也就包含了文件列表等信息
23        // 后面删除文件时会用到
24        table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
25        LOG.info("Requesting Cleaning with instant time " + cleanInstant);
26      } catch (IOException e) {
27        LOG.error("Got exception when saving cleaner requested file", e);
28        throw new HoodieIOException(e.getMessage(), e);
29      }
30      // 返回cleanerPlan
31      return Option.of(cleanerPlan);
32    }
33    // 返回空
34    return Option.empty();
35  }

 1  /**
2   * Generates List of files to be cleaned.
3   * 生成需要被清理的文件列表
4   *
5   * @param context HoodieEngineContext
6   * @return Cleaner Plan
7   */

8  HoodieCleanerPlan requestClean(HoodieEngineContext context) {
9    try {
10      CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
11      // 获取最早需要保留的HoodieInstant
12      Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
13      // 获取需要被清理的分区路径
14      List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
15
16      if (partitionsToClean.isEmpty()) {
17        // 如果分区路径为空,直接返回一个空的HoodieCleanerPlan
18        LOG.info("Nothing to clean here. It is already clean");
19        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
20      }
21      LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
22      // 清理的并发度
23      int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
24      LOG.info("Using cleanerParallelism: " + cleanerParallelism);
25
26      context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
27      // Map<分区路径,要删除的文件列表>, 真正的实现是在planner.getDeletePaths
28      Map<String, List<HoodieCleanFileInfo>> cleanOps = context
29          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
30          .stream()
31          .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));
32
33      // 构造HoodieCleanerPlan并返回,参数分别:
34      // earliestInstantToRetain = 根据earliestInstant生成的HoodieActionInstant
35      // policy = config.getCleanerPolicy().name()
36      // filesToBeDeletedPerPartition = CollectionUtils.createImmutableMap() 一个空的只读的map
37      // version = 2
38      // filePathsToBeDeletedPerPartition = cleanOps,即上面我们获取的要删除的文件列表
39      return new HoodieCleanerPlan(earliestInstant
40          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
41          config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
42          CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);
43    } catch (IOException e) {
44      throw new HoodieIOException("Failed to schedule clean operation", e);
45    }
46  }

HoodieJavaCopyOnWriteTable.clean

1  public HoodieCleanMetadata clean(HoodieEngineContext context,
2                                   String cleanInstantTime)
 
{
3    return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute();
4  }

BaseCleanActionExecutor.execute

 1  public HoodieCleanMetadata execute() {
2    List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
3    // If there are inflight(failed) or previously requested clean operation, first perform them
4    // 获取状态为inflight或者requested的clean instant
5    // 因为我们前面创建了.clean.requested所以首先包含前面创建的.requested
6    // 如果还有其他的.clean.inflight文件,这表明是之前失败的操作,也需要执行clean
7    List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
8        .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
9    if (pendingCleanInstants.size() > 0) {
10      pendingCleanInstants.forEach(hoodieInstant -> {
11        LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
12        try {
13          cleanMetadataList.add(runPendingClean(table, hoodieInstant));
14        } catch (Exception e) {
15          LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
16        }
17      });
18      table.getMetaClient().reloadActiveTimeline();
19    }
20    // return the last clean metadata for now
21    // 返回最后一个cleanMetadata
22    // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
23    // This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutor
24    return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
25  }
26
27  /**
28   * Executes the Cleaner plan stored in the instant metadata.
29   */

30  HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {
31    try {
32      // 将.clean.requested或者.clean.inflight反序列为cleanerPlan
33      HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
34      return runClean(table, cleanInstant, cleanerPlan);
35    } catch (IOException e) {
36      throw new HoodieIOException(e.getMessage(), e);
37    }
38  }

 1  private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
2    ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
3        || cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
4
5    try {
6      final HoodieInstant inflightInstant;
7      final HoodieTimer timer = new HoodieTimer();
8      timer.startTimer();
9      if (cleanInstant.isRequested()) {
10        // 如果是.clean.requested,转化为.clean.inflight
11        inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
12            TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
13      } else {
14        inflightInstant = cleanInstant;
15      }
16      // 执行clean方法,主要是删除文件,返回HoodieCleanStat列表
17      // 具体在实现类,这里是 JavaCleanActionExecutor
18      List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
19      if (cleanStats.isEmpty()) {
20        return HoodieCleanMetadata.newBuilder().build();
21      }
22
23      table.getMetaClient().reloadActiveTimeline();
24      // 构建HoodieCleanMetadata
25      HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
26          inflightInstant.getTimestamp(),
27          Option.of(timer.endTimer()),
28          cleanStats
29      );
30      // 生成.clean,并将metadata序列化到.clean
31      table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
32          TimelineMetadataUtils.serializeCleanMetadata(metadata));
33      LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
34      return metadata;
35    } catch (IOException e) {
36      throw new HoodieIOException("Failed to clean up after commit", e);
37    }
38  }

JavaCleanActionExecutor.clean

 1  List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
2    // 需要被删除的文件列表
3    Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
4        .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
5    // 通过deleteFilesFunc函数执行删除文件操作,返回PartitionCleanStat
6    Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
7        deleteFilesFunc(filesToBeDeletedPerPartition, table)
8            .collect(Collectors.groupingBy(Pair::getLeft))
9            .entrySet().stream()
10            .map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));
11
12    Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
13        .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
14
15    // Return PartitionCleanStat for each partition passed.
16    return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
17      PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
18          ? partitionCleanStatsMap.get(partitionPath)
19          : new PartitionCleanStat(partitionPath);
20      HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
21      return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
22          .withEarliestCommitRetained(Option.ofNullable(
23              actionInstant != null
24                  ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
25                  actionInstant.getAction(), actionInstant.getTimestamp())
26                  : null))
27          .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
28          .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
29          .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
30          .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
31          .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
32          .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
33          .build();
34    }).collect(Collectors.toList());
35  }
36
37  private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {
38    Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
39    FileSystem fs = table.getMetaClient().getFs();
40
41    while (iter.hasNext()) {
42      Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
43      String partitionPath = partitionDelFileTuple.getLeft();
44      Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());
45      String deletePathStr = deletePath.toString();
46      Boolean deletedFileResult = null;
47      try {
48        // 删除文件返回是否删除成功
49        deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
50      } catch (IOException e) {
51        LOG.error("Delete file failed");
52      }
53      if (!partitionCleanStatMap.containsKey(partitionPath)) {
54        partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
55      }
56      boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();
57      PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
58      if (isBootstrapBasePathFile) {
59        // For Bootstrap Base file deletions, store the full file path.
60        partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
61        partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
62      } else {
63        partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
64        partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
65      }
66    }
67    return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
68  }
69
70  protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
71    Path deletePath = new Path(deletePathStr);
72    LOG.debug("Working on delete path :" + deletePath);
73    try {
74      boolean deleteResult = fs.delete(deletePath, false);
75      if (deleteResult) {
76        LOG.debug("Cleaned file at path :" + deletePath);
77      }
78      return deleteResult;
79    } catch (FileNotFoundException fio) {
80      // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
81      return false;
82    }
83  }

获取要删除的文件列表

这里和策略配置参数有关,并且逻辑相对复杂一点,就先贴一下入口的代码,先不深入,以后单独总结

1      Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
2      List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
3      planner.getDeletePaths(partitionPathToClean)

 1  /**
2   * Returns earliest commit to retain based on cleaning policy.
3   * 根据清理策略返回最早的需要保留的commit
4   */

5  public Option<HoodieInstant> getEarliestCommitToRetain() {
6    Option<HoodieInstant> earliestCommitToRetain = Option.empty();
7    int commitsRetained = config.getCleanerCommitsRetained();
8    if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
9        && commitTimeline.countInstants() > commitsRetained) {
10      earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
11    }
12    return earliestCommitToRetain;
13  }

 1  /**
2   * Returns list of partitions where clean operations needs to be performed.
3   * 返回清理操作需要执行的分区列表
4   *
5   * @param earliestRetainedInstant New instant to be retained after this cleanup operation
6   * @return list of partitions to scan for cleaning
7   * @throws IOException when underlying file-system throws this exception
8   */

9  public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
10    switch (config.getCleanerPolicy()) {
11      case KEEP_LATEST_COMMITS:
12        return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
13      case KEEP_LATEST_FILE_VERSIONS:
14        return getPartitionPathsForFullCleaning();
15      default:
16        throw new IllegalStateException("Unknown Cleaner Policy");
17    }
18  }

 1  /**
2   * Returns files to be cleaned for the given partitionPath based on cleaning policy.
3   * 基于清理策略根据给出的分区路径返回需要被清理的文件列表
4   */

5  public List<CleanFileInfo> getDeletePaths(String partitionPath) {
6    HoodieCleaningPolicy policy = config.getCleanerPolicy();
7    List<CleanFileInfo> deletePaths;
8    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
9      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
10    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
11      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
12    } else {
13      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
14    }
15    LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
16
17    return deletePaths;
18  }

注释代码

https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments


文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论