1. 介绍
Hudi提供了savepoint机制,即可对instant进行备份,当后续出现提交错误时,便可rollback至指定savepoint,这对于线上系统至为重要,而savepoint由hudi-CLI手动触发,下面分析savepoint的实现机制。
2. 分析
2.1 创建savepoint
创建savepoint的入口为 HoodieWriteClient#savepoint
,其核心代码如下
public boolean savepoint(String commitTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// MERGE_ON_READ类型表不支持savepoint
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
// 获取已完成的clean的最后一个instant
Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
// instant不存在
throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
}
try {
String lastCommitRetained;
if (cleanInstant.isPresent()) { // 若clean instant存在
// 反序列化出clean的相关信息
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
// 保存的最早的commit
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
} else {
// 获取最早已完成的instant
lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
}
// savepoint的时间必须大于/等于最早保留的commit时间
Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
"Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained);
Map<String, List<String>> latestFilesMap = jsc
.parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
ReadOptimizedView view = table.getROFileSystemView();
// 获取该partition下所有小于commitTime的文件
List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
.map(HoodieDataFile::getFileName).collect(Collectors.toList());
return new Tuple2<>(partitionPath, latestFiles);
}).collectAsMap();
// 转化为savepoint metadata.
HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// 创建savepoint类型的instant,在meta目录下会创建inflight类型文件
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime));
// 标记为complete状态,并将savepoint metadata存入文件
table.getActiveTimeline()
.saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
AvroUtils.serializeSavepointMetadata(metadata));
return true;
} catch (IOException e) {
throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
}
}
可以看到,首先会根据是否已经有完成的clean类型的instant,如果存在则会反序列化出对应的 HoodieCleanMetadata
,并获取最早保留的commit的时间,然后获取所有分区路径下所有小于savepoint时间的文件,然后转化为 HoodieSavepointMetadata
后保存至元数据目录下的文件中。创建savepoint的最终结果就是在元数据目录下创建了一个*.savepoint的文件。
2.2 回滚savepoint
在创建完savepoint之后,便可回滚至指定的savepoint,其入口为 HoodieWriteClient#rollbackToSavepoint
,其核心代码如下
public boolean rollbackToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// rollback是需要手动操作,并且不支持并发写或compaction,rollback将会移除savepoint之后处于pending状态的compaction
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) { // 检查instant是否存在
throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);
}
// 恢复至指定savepoint的instant
restoreToInstant(savepointTime);
Option<HoodieInstant> lastInstant =
activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
Preconditions.checkArgument(lastInstant.isPresent());
Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
+ lastInstant.get().getTimestamp());
return true;
}
可以看到,回滚至指定savepoint时必须是手动执行,然后校验该instant是否存在,然后调用 restoreToInstant
恢复至指定instant, restoreToInstant
的核心代码如下
public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
// 过滤出大于指定commit time的instant
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
.collect(Collectors.toList());
// 创建新的instant
String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
//
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
// 创建restore类型的instant,在元数据目录下创建inflight文件
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.stream().forEach(instant -> {
try {
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
// 回滚,从高到低
List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForInstant);
break;
case HoodieTimeline.COMPACTION_ACTION:
// 回滚,从高到低
List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForCompaction);
break;
default:
throw new IllegalArgumentException("invalid action name " + instant.getAction());
}
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instant " + instant, io);
}
});
try {
// 结束恢复
finishRestore(context, instantsToStats.build(),
instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
startRollbackInstant, instantTime);
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
}
}
可以看到会根据指定的instant过滤出大于该instant的所有instant(包括commit和compaction类型的待回滚的instant),然后依次遍历每个instant,其中 doRollbackAndGetStats
核心代码如下
private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
IOException {
// 获取commit time
final String commitToRollback = instantToRollback.getTimestamp();
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
// 获取所有的已完成的savepoint的timeline
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
savepoints.stream().forEach(s -> {
if (s.contains(commitToRollback)) { // 为savepoint,则抛出异常,不允许回滚savepoint
throw new HoodieRollbackException(
"Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
}
});
String lastCommit = commitToRollback;
// 保证严格按照从高到低顺序回滚
if ((lastCommit != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
}
// 获取处于inflight和requested状态的timeline
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");
}
// 进行rollback
List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
// 回滚索引
if (!index.rollbackCommit(commitToRollback)) {
throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
}
return stats;
}
可以看到,在进行回滚时会进行严格的检验,即从高的commit(timestamp大)到低的commit(timestamp小),在校验通过后会调用 rollback
进行实际的回滚, rollback
将会删除对应的instant和文件,具体细节后续会单独分析。
在回滚完后会调用 finishRestore
表示结束恢复,把一些统计信息持久化,其核心代码如下
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Option<Long> durationInMs = Option.empty();
Long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
// 统计删除的文件数
List<HoodieRollbackStat> stats = commitToStat.getValue();
numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
}
// 转化为metadata
HoodieRestoreMetadata restoreMetadata =
AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
// 保存至meta目录
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
AvroUtils.serializeRestoreMetadata(restoreMetadata));
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
// 清理老的meta文件
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getRestoreTimeline().getInstants());
}
}
可以看到在rollback之后会生成一些统计信息,然后会将统计信息存储至元数据目录下。
3. 总结
Hudi提供了savepoint机制可对某一instant进行备份,然后可通过rollback回滚至指定的savepoint,但值得注意的是回滚只能从大的savepoint开始回滚,即存在多个savepoint的情况下,不能直接回退至较小的savepoint。而创建savepoint流程则会将待回滚的信息先存储至元数据目录,而回滚savepoint流程则会从最大的instant开始进行回滚,最后会将回滚的统计信息写入元数据目录。







