前言
源码层面总结分析Hudi Clean是如何实现的,不了解Hudi Clean的可以先看这篇:一文彻底理解Apache Hudi的清理服务。
Hudi Clean主要是清理删除不需要的历史文件,可以根据实际业务需要配置参数,不能影响查询,比如某个查询语句正在用某个文件,Clean如果删除了这个文件,查询就会报错。
这里只是删除历史文件,Hudi的文件是有多个版本的,不管配置什么参数,使用什么策略,都不会删除当前最新版本的文件。
Hudi 0.9.0版本有两种清理策略KEEP_LATEST_COMMITS
和KEEP_LATEST_FILE_VERSIONS
,默认为KEEP_LATEST_COMMITS
KEEP_LATEST_COMMITS:简单讲就是根据commit的次数,默认保留最新的10个commit的所有文件,对于10个之前的文件只保留最新版本的文件,历史文件全部删除
KEEP_LATEST_FILE_VERSIONS:简单讲就是保留文件的版本数,默认保留三个版本
具体的可以看上面的那篇公众号文章
目前最新版本0.11.0 添加了一个新的策略KEEP_LATEST_BY_HOURS
:根据小时数清理,默认保留最近24小时的文件,具体实现请查看PR:[HUDI-349] Added new cleaning policy based on number of hours
本文以Hudi 0.9.0 Java Client COW表 进行分析
Insert
HoodieJavaWriteClient->postWrite->postCommit->autoCleanOnCommit
以Insert为入口进行代码跟踪,Hudi源码里有java客户端的代码示例,这里只贴部分主要代码
1writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
2String newCommitTime = writeClient.startCommit();
3writeClient.insert(records, newCommitTime);
HoodieJavaWriteClient.insert
1 public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
2 HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
3 getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
4 table.validateUpsertSchema();
5 preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
6 HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
7 if (result.getIndexLookupDuration().isPresent()) {
8 metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
9 }
10 return postWrite(result, instantTime, table);
11 }
在执行完table.insert写完数据后会执行postWrite方法
1 @Override
2 protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
3 String instantTime,
4 HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
5 if (result.getIndexLookupDuration().isPresent()) {
6 metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
7 }
8 if (result.isCommitted()) {
9 // Perform post commit operations.
10 if (result.getFinalizeDuration().isPresent()) {
11 metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
12 result.getWriteStats().get().size());
13 }
14
15 postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
16
17 emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
18 }
19 return result.getWriteStatuses();
20 }
postWrite方法里又会执行父类 AbstractHoodieWriteClient.postCommit
方法
1 protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
2 try {
3 // Delete the marker directory for the instant.
4 WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
5 .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
6 // We cannot have unbounded commit files. Archive commits if we have to archive
7 HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
8 archiveLog.archiveIfRequired(context);
9 // commit期间执行自动清理
10 autoCleanOnCommit();
11 if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
12 syncTableMetadata();
13 }
14 } catch (IOException ioe) {
15 throw new HoodieIOException(ioe.getMessage(), ioe);
16 } finally {
17 this.heartbeatClient.stop(instantTime);
18 }
19 }
postCommit方法里会调用autoCleanOnCommit()执行清理文件
AbstractHoodieWriteClient.autoCleanOnCommit
autoCleanOnCommit->clean->scheduleTableServiceInternal->HoodieJavaCopyOnWriteTable.clean
首先调用scheduleTableServiceInternal
,该方法会根据清理策略配置参数获取最早的需要保留的instant(earliestInstant
),然后获取需要清理的分区路径列表(partitionsToClean
),再根据分区路径获取需要删除的文件列表,最后将这些信息封装成HoodieCleanerPlan
序列化到新创建的
.clean.requested文件中
再执行HoodieJavaCopyOnWriteTable.clean
,该方法首先获取刚才创建的.clean.requested
文件和其他的之前失败的(如果有的话).clean.inflight
,然后反序列化刚才保存的.clean.requested的文件内容为HoodieCleanerPlan
,然后通过deleteFilesFunc
方法依次删除HoodieCleanerPlan
里的要删除的文件列表并返回HoodieCleanStat
,最后将HoodieCleanStat
作为参数构建HoodieCleanMetadata
,然后将HoodieCleanMetadata
序列化保存到新创建的.clean
文件中,这样整个clean操作就基本完成了。
如何根据清理策略获取要被清理的文件列表,请看后面的部分:获取要删除的文件列表
1 /**
2 * Handle auto clean during commit.
3 *
4 */
5 protected void autoCleanOnCommit() {
6 if (config.isAutoClean()) { // 默认true
7 // Call clean to cleanup if there is anything to cleanup after the commit,
8 if (config.isAsyncClean()) { // 默认false
9 LOG.info("Cleaner has been spawned already. Waiting for it to finish");
10 AsyncCleanerService.waitForCompletion(asyncCleanerService);
11 LOG.info("Cleaner has finished");
12 } else {
13 // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
14 LOG.info("Auto cleaning is enabled. Running cleaner now");
15 // 执行clean操作
16 clean();
17 }
18 }
19 }
20 public HoodieCleanMetadata clean() {
21 return clean(HoodieActiveTimeline.createNewInstantTime());
22 }
23
24 public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
25 return clean(cleanInstantTime, true);
26 }
前面的只是调用链,下面才到了真正的逻辑
1 /**
2 * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
3 * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
4 * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
5 * {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
6 * of clean.
7 */
8 public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
9 if (scheduleInline) {
10 // 主要逻辑为:创建.clean.requested
11 // .clean.requested内容为序列化后的(包含了要删除的文件列表等信息的)cleanerPlan
12 scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
13 }
14 LOG.info("Cleaner started");
15 final Timer.Context timerContext = metrics.getCleanCtx();
16 LOG.info("Cleaned failed attempts if any");
17 // 判断是否执行rollback,默认策略为EAGER,clean期间不执行rollback
18 CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
19 HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
20 // 执行table.clean,删除需要删除的文件,转化.clean.requested=>.clean.inflight=>.clean,返回HoodieCleanMetadata
21 // 这里为HoodieJavaCopyOnWriteTable.clean
22 HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
23 if (timerContext != null && metadata != null) {
24 long durationMs = metrics.getDurationInMs(timerContext.stop());
25 metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
26 LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
27 + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
28 + " cleanerElapsedMs" + durationMs);
29 }
30 // 返回metadata
31 return metadata;
32 }
scheduleTableServiceInternal
1 private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
2 TableServiceType tableServiceType) {
3 switch (tableServiceType) {
4 case CLUSTER:
5 LOG.info("Scheduling clustering at instant time :" + instantTime);
6 Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
7 .scheduleClustering(context, instantTime, extraMetadata);
8 return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
9 case COMPACT:
10 LOG.info("Scheduling compaction at instant time :" + instantTime);
11 Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
12 .scheduleCompaction(context, instantTime, extraMetadata);
13 return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
14 case CLEAN:
15 LOG.info("Scheduling cleaning at instant time :" + instantTime);
16 // 这里在子类`HoodieJavaCopyOnWriteTable.scheduleCleaning`实现
17 Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
18 .scheduleCleaning(context, instantTime, extraMetadata);
19 return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
20 default:
21 throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
22 }
23 }
HoodieJavaCopyOnWriteTable.scheduleCleaning
1 public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
2 return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
3 }
4
5 public Option<HoodieCleanerPlan> execute() {
6 // Plan a new clean action
7 return requestClean(instantTime);
8 }
1 /**
2 * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
3 * // 如果有需要被清理的文件,创建一个cleanerPlan,并且将它们保存到instant文件中
4 * Cleaner Plan contains absolute file paths.
5 * cleanerPlan 包含文件的绝对路径
6 *
7 * @param startCleanTime Cleaner Instant Time
8 * @return Cleaner Plan if generated
9 */
10 protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
11 // cleanerPlan包含需要被清理的文件列表
12 final HoodieCleanerPlan cleanerPlan = requestClean(context);
13 if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
14 && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
15 && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
16 // 如果要删除的文件列表不为空
17 // Only create cleaner plan which does some work
18 // 创建.clean.requested
19 final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
20 // Save to both aux and timeline folder
21 try {
22 // 保存.clean.requested,.clean.requested文件里包含了序列化的cleanerPlan,也就包含了文件列表等信息
23 // 后面删除文件时会用到
24 table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
25 LOG.info("Requesting Cleaning with instant time " + cleanInstant);
26 } catch (IOException e) {
27 LOG.error("Got exception when saving cleaner requested file", e);
28 throw new HoodieIOException(e.getMessage(), e);
29 }
30 // 返回cleanerPlan
31 return Option.of(cleanerPlan);
32 }
33 // 返回空
34 return Option.empty();
35 }
1 /**
2 * Generates List of files to be cleaned.
3 * 生成需要被清理的文件列表
4 *
5 * @param context HoodieEngineContext
6 * @return Cleaner Plan
7 */
8 HoodieCleanerPlan requestClean(HoodieEngineContext context) {
9 try {
10 CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
11 // 获取最早需要保留的HoodieInstant
12 Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
13 // 获取需要被清理的分区路径
14 List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
15
16 if (partitionsToClean.isEmpty()) {
17 // 如果分区路径为空,直接返回一个空的HoodieCleanerPlan
18 LOG.info("Nothing to clean here. It is already clean");
19 return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
20 }
21 LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
22 // 清理的并发度
23 int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
24 LOG.info("Using cleanerParallelism: " + cleanerParallelism);
25
26 context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
27 // Map<分区路径,要删除的文件列表>, 真正的实现是在planner.getDeletePaths
28 Map<String, List<HoodieCleanFileInfo>> cleanOps = context
29 .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
30 .stream()
31 .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));
32
33 // 构造HoodieCleanerPlan并返回,参数分别:
34 // earliestInstantToRetain = 根据earliestInstant生成的HoodieActionInstant
35 // policy = config.getCleanerPolicy().name()
36 // filesToBeDeletedPerPartition = CollectionUtils.createImmutableMap() 一个空的只读的map
37 // version = 2
38 // filePathsToBeDeletedPerPartition = cleanOps,即上面我们获取的要删除的文件列表
39 return new HoodieCleanerPlan(earliestInstant
40 .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
41 config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
42 CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);
43 } catch (IOException e) {
44 throw new HoodieIOException("Failed to schedule clean operation", e);
45 }
46 }
HoodieJavaCopyOnWriteTable.clean
1 public HoodieCleanMetadata clean(HoodieEngineContext context,
2 String cleanInstantTime) {
3 return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute();
4 }
BaseCleanActionExecutor.execute
1 public HoodieCleanMetadata execute() {
2 List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
3 // If there are inflight(failed) or previously requested clean operation, first perform them
4 // 获取状态为inflight或者requested的clean instant
5 // 因为我们前面创建了.clean.requested所以首先包含前面创建的.requested
6 // 如果还有其他的.clean.inflight文件,这表明是之前失败的操作,也需要执行clean
7 List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
8 .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
9 if (pendingCleanInstants.size() > 0) {
10 pendingCleanInstants.forEach(hoodieInstant -> {
11 LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
12 try {
13 cleanMetadataList.add(runPendingClean(table, hoodieInstant));
14 } catch (Exception e) {
15 LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
16 }
17 });
18 table.getMetaClient().reloadActiveTimeline();
19 }
20 // return the last clean metadata for now
21 // 返回最后一个cleanMetadata
22 // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
23 // This requires the BaseCleanActionExecutor to be refactored as BaseCommitActionExecutor
24 return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
25 }
26
27 /**
28 * Executes the Cleaner plan stored in the instant metadata.
29 */
30 HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {
31 try {
32 // 将.clean.requested或者.clean.inflight反序列为cleanerPlan
33 HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
34 return runClean(table, cleanInstant, cleanerPlan);
35 } catch (IOException e) {
36 throw new HoodieIOException(e.getMessage(), e);
37 }
38 }
1 private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
2 ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
3 || cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
4
5 try {
6 final HoodieInstant inflightInstant;
7 final HoodieTimer timer = new HoodieTimer();
8 timer.startTimer();
9 if (cleanInstant.isRequested()) {
10 // 如果是.clean.requested,转化为.clean.inflight
11 inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
12 TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
13 } else {
14 inflightInstant = cleanInstant;
15 }
16 // 执行clean方法,主要是删除文件,返回HoodieCleanStat列表
17 // 具体在实现类,这里是 JavaCleanActionExecutor
18 List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
19 if (cleanStats.isEmpty()) {
20 return HoodieCleanMetadata.newBuilder().build();
21 }
22
23 table.getMetaClient().reloadActiveTimeline();
24 // 构建HoodieCleanMetadata
25 HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
26 inflightInstant.getTimestamp(),
27 Option.of(timer.endTimer()),
28 cleanStats
29 );
30 // 生成.clean,并将metadata序列化到.clean
31 table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
32 TimelineMetadataUtils.serializeCleanMetadata(metadata));
33 LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
34 return metadata;
35 } catch (IOException e) {
36 throw new HoodieIOException("Failed to clean up after commit", e);
37 }
38 }
JavaCleanActionExecutor.clean
1 List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
2 // 需要被删除的文件列表
3 Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
4 .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
5 // 通过deleteFilesFunc函数执行删除文件操作,返回PartitionCleanStat
6 Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
7 deleteFilesFunc(filesToBeDeletedPerPartition, table)
8 .collect(Collectors.groupingBy(Pair::getLeft))
9 .entrySet().stream()
10 .map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));
11
12 Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
13 .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
14
15 // Return PartitionCleanStat for each partition passed.
16 return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
17 PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
18 ? partitionCleanStatsMap.get(partitionPath)
19 : new PartitionCleanStat(partitionPath);
20 HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
21 return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
22 .withEarliestCommitRetained(Option.ofNullable(
23 actionInstant != null
24 ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
25 actionInstant.getAction(), actionInstant.getTimestamp())
26 : null))
27 .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
28 .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
29 .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
30 .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
31 .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
32 .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
33 .build();
34 }).collect(Collectors.toList());
35 }
36
37 private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {
38 Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
39 FileSystem fs = table.getMetaClient().getFs();
40
41 while (iter.hasNext()) {
42 Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
43 String partitionPath = partitionDelFileTuple.getLeft();
44 Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());
45 String deletePathStr = deletePath.toString();
46 Boolean deletedFileResult = null;
47 try {
48 // 删除文件返回是否删除成功
49 deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
50 } catch (IOException e) {
51 LOG.error("Delete file failed");
52 }
53 if (!partitionCleanStatMap.containsKey(partitionPath)) {
54 partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
55 }
56 boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();
57 PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
58 if (isBootstrapBasePathFile) {
59 // For Bootstrap Base file deletions, store the full file path.
60 partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
61 partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
62 } else {
63 partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
64 partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
65 }
66 }
67 return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
68 }
69
70 protected static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
71 Path deletePath = new Path(deletePathStr);
72 LOG.debug("Working on delete path :" + deletePath);
73 try {
74 boolean deleteResult = fs.delete(deletePath, false);
75 if (deleteResult) {
76 LOG.debug("Cleaned file at path :" + deletePath);
77 }
78 return deleteResult;
79 } catch (FileNotFoundException fio) {
80 // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
81 return false;
82 }
83 }
获取要删除的文件列表
这里和策略配置参数有关,并且逻辑相对复杂一点,就先贴一下入口的代码,先不深入,以后单独总结
1 Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
2 List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
3 planner.getDeletePaths(partitionPathToClean)
1 /**
2 * Returns earliest commit to retain based on cleaning policy.
3 * 根据清理策略返回最早的需要保留的commit
4 */
5 public Option<HoodieInstant> getEarliestCommitToRetain() {
6 Option<HoodieInstant> earliestCommitToRetain = Option.empty();
7 int commitsRetained = config.getCleanerCommitsRetained();
8 if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
9 && commitTimeline.countInstants() > commitsRetained) {
10 earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
11 }
12 return earliestCommitToRetain;
13 }
1 /**
2 * Returns list of partitions where clean operations needs to be performed.
3 * 返回清理操作需要执行的分区列表
4 *
5 * @param earliestRetainedInstant New instant to be retained after this cleanup operation
6 * @return list of partitions to scan for cleaning
7 * @throws IOException when underlying file-system throws this exception
8 */
9 public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
10 switch (config.getCleanerPolicy()) {
11 case KEEP_LATEST_COMMITS:
12 return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
13 case KEEP_LATEST_FILE_VERSIONS:
14 return getPartitionPathsForFullCleaning();
15 default:
16 throw new IllegalStateException("Unknown Cleaner Policy");
17 }
18 }
1 /**
2 * Returns files to be cleaned for the given partitionPath based on cleaning policy.
3 * 基于清理策略根据给出的分区路径返回需要被清理的文件列表
4 */
5 public List<CleanFileInfo> getDeletePaths(String partitionPath) {
6 HoodieCleaningPolicy policy = config.getCleanerPolicy();
7 List<CleanFileInfo> deletePaths;
8 if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
9 deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
10 } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
11 deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
12 } else {
13 throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
14 }
15 LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
16
17 return deletePaths;
18 }
注释代码
https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments




