暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Hudi Savepoint实现分析

ApacheHudi 2021-04-20
598

1. 介绍

Hudi提供了savepoint机制,即可对instant进行备份,当后续出现提交错误时,便可rollback至指定savepoint,这对于线上系统至为重要,而savepoint由hudi-CLI手动触发,下面分析savepoint的实现机制。

2. 分析

2.1 创建savepoint

创建savepoint的入口为 HoodieWriteClient#savepoint
,其核心代码如下

  1. public boolean savepoint(String commitTime, String user, String comment) {

  2. HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);

  3. if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {

  4. // MERGE_ON_READ类型表不支持savepoint

  5. throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");

  6. }

  7. // 获取已完成的clean的最后一个instant

  8. Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();


  9. HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);

  10. if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {

  11. // instant不存在

  12. throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);

  13. }


  14. try {

  15. String lastCommitRetained;

  16. if (cleanInstant.isPresent()) { // 若clean instant存在

  17. // 反序列化出clean的相关信息

  18. HoodieCleanMetadata cleanMetadata = AvroUtils

  19. .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());

  20. // 保存的最早的commit

  21. lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();

  22. } else {

  23. // 获取最早已完成的instant

  24. lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();

  25. }

  26. // savepoint的时间必须大于/等于最早保留的commit时间

  27. Preconditions.checkArgument(

  28. HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),

  29. "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained);


  30. Map<String, List<String>> latestFilesMap = jsc

  31. .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),

  32. config.shouldAssumeDatePartitioning()))

  33. .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {

  34. ReadOptimizedView view = table.getROFileSystemView();

  35. // 获取该partition下所有小于commitTime的文件

  36. List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)

  37. .map(HoodieDataFile::getFileName).collect(Collectors.toList());

  38. return new Tuple2<>(partitionPath, latestFiles);

  39. }).collectAsMap();

  40. // 转化为savepoint metadata.

  41. HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);

  42. // 创建savepoint类型的instant,在meta目录下会创建inflight类型文件

  43. table.getActiveTimeline().createNewInstant(

  44. new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime));

  45. // 标记为complete状态,并将savepoint metadata存入文件

  46. table.getActiveTimeline()

  47. .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),

  48. AvroUtils.serializeSavepointMetadata(metadata));

  49. return true;

  50. } catch (IOException e) {

  51. throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);

  52. }

  53. }

可以看到,首先会根据是否已经有完成的clean类型的instant,如果存在则会反序列化出对应的 HoodieCleanMetadata
,并获取最早保留的commit的时间,然后获取所有分区路径下所有小于savepoint时间的文件,然后转化为 HoodieSavepointMetadata
后保存至元数据目录下的文件中。创建savepoint的最终结果就是在元数据目录下创建了一个*.savepoint的文件。

2.2 回滚savepoint

在创建完savepoint之后,便可回滚至指定的savepoint,其入口为 HoodieWriteClient#rollbackToSavepoint
,其核心代码如下

  1. public boolean rollbackToSavepoint(String savepointTime) {

  2. HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);

  3. HoodieActiveTimeline activeTimeline = table.getActiveTimeline();

  4. // rollback是需要手动操作,并且不支持并发写或compaction,rollback将会移除savepoint之后处于pending状态的compaction

  5. HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();


  6. HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);

  7. boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);

  8. if (!isSavepointPresent) { // 检查instant是否存在

  9. throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);

  10. }

  11. // 恢复至指定savepoint的instant

  12. restoreToInstant(savepointTime);


  13. Option<HoodieInstant> lastInstant =

  14. activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();

  15. Preconditions.checkArgument(lastInstant.isPresent());

  16. Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),

  17. savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "

  18. + lastInstant.get().getTimestamp());

  19. return true;

  20. }

可以看到,回滚至指定savepoint时必须是手动执行,然后校验该instant是否存在,然后调用 restoreToInstant
恢复至指定instant, restoreToInstant
的核心代码如下

  1. public void restoreToInstant(final String instantTime) throws HoodieRollbackException {


  2. HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);

  3. // 过滤出大于指定commit time的instant

  4. List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()

  5. .getReverseOrderedInstants()

  6. .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))

  7. .collect(Collectors.toList());

  8. // 创建新的instant

  9. String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();

  10. //

  11. ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();

  12. // 创建restore类型的instant,在元数据目录下创建inflight文件

  13. table.getActiveTimeline().createNewInstant(

  14. new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));

  15. instantsToRollback.stream().forEach(instant -> {

  16. try {

  17. switch (instant.getAction()) {

  18. case HoodieTimeline.COMMIT_ACTION:

  19. case HoodieTimeline.DELTA_COMMIT_ACTION:

  20. // 回滚,从高到低

  21. List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);

  22. instantsToStats.put(instant.getTimestamp(), statsForInstant);

  23. break;

  24. case HoodieTimeline.COMPACTION_ACTION:

  25. // 回滚,从高到低

  26. List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);

  27. instantsToStats.put(instant.getTimestamp(), statsForCompaction);

  28. break;

  29. default:

  30. throw new IllegalArgumentException("invalid action name " + instant.getAction());

  31. }

  32. } catch (IOException io) {

  33. throw new HoodieRollbackException("unable to rollback instant " + instant, io);

  34. }

  35. });

  36. try {

  37. // 结束恢复

  38. finishRestore(context, instantsToStats.build(),

  39. instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),

  40. startRollbackInstant, instantTime);

  41. } catch (IOException io) {

  42. throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);

  43. }

  44. }

可以看到会根据指定的instant过滤出大于该instant的所有instant(包括commit和compaction类型的待回滚的instant),然后依次遍历每个instant,其中 doRollbackAndGetStats
核心代码如下

  1. private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws

  2. IOException {

  3. // 获取commit time

  4. final String commitToRollback = instantToRollback.getTimestamp();

  5. HoodieTable<T> table = HoodieTable.getHoodieTable(

  6. createMetaClient(true), config, jsc);

  7. HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();

  8. HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();

  9. // Check if any of the commits is a savepoint - do not allow rollback on those commits

  10. // 获取所有的已完成的savepoint的timeline

  11. List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)

  12. .collect(Collectors.toList());

  13. savepoints.stream().forEach(s -> {

  14. if (s.contains(commitToRollback)) { // 为savepoint,则抛出异常,不允许回滚savepoint

  15. throw new HoodieRollbackException(

  16. "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);

  17. }

  18. });


  19. String lastCommit = commitToRollback;

  20. // 保证严格按照从高到低顺序回滚

  21. if ((lastCommit != null) && !commitTimeline.empty()

  22. && !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {

  23. throw new HoodieRollbackException(

  24. "Found commits after time :" + lastCommit + ", please rollback greater commits first");

  25. }

  26. // 获取处于inflight和requested状态的timeline

  27. List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)

  28. .collect(Collectors.toList());

  29. if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {

  30. throw new HoodieRollbackException(

  31. "Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");

  32. }

  33. // 进行rollback

  34. List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);


  35. // 回滚索引

  36. if (!index.rollbackCommit(commitToRollback)) {

  37. throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);

  38. }


  39. return stats;

  40. }

可以看到,在进行回滚时会进行严格的检验,即从高的commit(timestamp大)到低的commit(timestamp小),在校验通过后会调用 rollback
进行实际的回滚, rollback
将会删除对应的instant和文件,具体细节后续会单独分析。

在回滚完后会调用 finishRestore
表示结束恢复,把一些统计信息持久化,其核心代码如下

  1. private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,

  2. List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {

  3. HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);

  4. Option<Long> durationInMs = Option.empty();

  5. Long numFilesDeleted = 0L;

  6. for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {

  7. // 统计删除的文件数

  8. List<HoodieRollbackStat> stats = commitToStat.getValue();

  9. numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();

  10. }


  11. // 转化为metadata

  12. HoodieRestoreMetadata restoreMetadata =

  13. AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);

  14. // 保存至meta目录

  15. table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),

  16. AvroUtils.serializeRestoreMetadata(restoreMetadata));


  17. if (!table.getActiveTimeline().getCleanerTimeline().empty()) {

  18. // 清理老的meta文件

  19. FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),

  20. table.getActiveTimeline().getRestoreTimeline().getInstants());

  21. }

  22. }

可以看到在rollback之后会生成一些统计信息,然后会将统计信息存储至元数据目录下。

3. 总结

Hudi提供了savepoint机制可对某一instant进行备份,然后可通过rollback回滚至指定的savepoint,但值得注意的是回滚只能从大的savepoint开始回滚,即存在多个savepoint的情况下,不能直接回退至较小的savepoint。而创建savepoint流程则会将待回滚的信息先存储至元数据目录,而回滚savepoint流程则会从最大的instant开始进行回滚,最后会将回滚的统计信息写入元数据目录。


文章转载自ApacheHudi,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论