导语
作为构建新一代数据湖的三个中间件Apache Iceberg, Apache Hudi, Delta Lake都支持Schema Evolution,但是三者的支持能力不尽相同,其中Iceberg宣称支持 Full Schema Evolution。本文将详细分析Iceberg 的Full Schema Evolution, 同时捎带对比下和Delta Lake以及Hudi的Schema Evolution的不同。
为什么需要Schema Evolution
用户的数据随着时间和业务量的增长会需要有一些格式上的变化,例如添加新的纬度,更细的分区粒度等。传统的Hive表如果想要处理这些变化可能需要创建一个新的表,将旧的数据读出来再写到新的表里。如果表的分区粒度也需要发生变化,例如,分区从天变成小时,那么还需要上层更改相关的查询语句,甚至还有引起正确性的问题。所以表结构更新Schema Evolution是新一代数据湖的一个重要特性, 使用新新一代数据湖的Schema Evolution特性很容易的对表的结构进行一些微调,例如添加某些列,从而满足用户数据变化的需求。
作为构建新一代数据湖的主流中间件,Apache Iceberg支持Full Schema Evolution的功能,包括添加列,删除列,更新列,更新分区列等操作。用户可以任意的对表的结构进行in-place的更新,包括对普通列以及嵌套类型的列进行结构更新,甚至当用户的存储更换时还支持对分区的列进行更新,后面的内容将详细介绍这个功能的重要作用。值得一提的是,Iceberg的表结构更新是内在的元信息更新,不需要花费数据迁移或者数据重写的代价。
Iceberg支持的表结构更新操作如下:
添加列:添加一个列到表,支持添加到嵌套类型列 删除列:删除表的某个列,支持删除嵌套类型 重命名列:重命名一个现有的列,支持嵌套类型 更新列:支持将列改的更宽,例如,Integer 更新成Long,支持更新struct, map, list复杂类型 重排序列:支持更新列的顺序,包括嵌套类型里面列的顺序
同时由于Iceberg的Schema逻辑是独立与上层引擎和底层文件格式,所以Iceberg 的schema evolution可以保证:
添加的列不会去读现有列上的数据 删除一个列不会影响其他列 更新一个列不会影响其他列的值 更新列顺序不会影响对应列的值
如何使用Iceberg Schema Evolution
首先,这里定义一个Iceberg的表结构如下:
final Schema SCHEMA = new Schema(required(1, "id", Types.IntegerType.get()),optional(2, "data", Types.StringType.get()),optional(3, "preferences", Types.StructType.of(required(8, "feature1", Types.BooleanType.get()),optional(9, "feature2", Types.BooleanType.get())), "struct of named boolean options"),required(4, "locations", Types.MapType.ofRequired(10, 11,Types.StructType.of(required(, "address", Types.StringType.get()),required(, "city", Types.StringType.get()),required(, "state", Types.StringType.get()),required(, "zip", Types.IntegerType.get())),Types.StructType.of(required(, "lat", Types.FloatType.get()),required(17, "long", Types.FloatType.get()))), "map of address to coordinate"),optional(5, "points", Types.ListType.ofOptional(,Types.StructType.of(required(19, "x", Types.LongType.get()),required(, "y", Types.LongType.get()))), "2-D cartesian points"),required(6, "doubles", Types.ListType.ofRequired(17,Types.DoubleType.get())),optional(7, "properties", Types.MapType.ofOptional(, ,Types.StringType.get(),Types.StringType.get()), "string map of properties"));
Column Evolution
添加列
//添加一个顶级列table.UpdateSchema().addColumn("toplevel",Types.DecimalType.of(9, 2)).commit()//添加一个列到嵌套类型里面table.UpdateSchema().addColumn("points", "z",Types.LongType.get(), "z axis").commit()
更新列
// 更新列类型table.UpdateSchema().updateColumn("id",Types.LongType.get()).commit()// 更新嵌套类里面子列类型table.UpdateSchema().updateColumn("locations.lat",Types.DoubleType.get()).commit()table.UpdateSchema().updateColumn("locations.long",Types.DoubleType.get()).commit()
重命名列
// 重命名顶级列table.UpdateSchema().renameColumn("data", "json").commit()// 重命名嵌套类型子列table.UpdateSchema().renameColumn("preferences.feature2", "newfeature").commit()table.UpdateSchema().renameColumn("locations.lat", "latitude").commit()table.UpdateSchema().renameColumn("points.x", "X").commit()
删除列
table.UpdateSchema().deleteColumn("points.z").commit();
Partition Evolution
和Spark与Hive不同,Iceberg采用的分区方式是隐式分区,这种隐式分区的方式使得分区更加灵活,可以通过以某些列作为输入,然后指定一个变换函数结合起来作为一个分区格式。例如,假设表的设计里面有一列是event_time,那么可能的分区方法有:
date(event_time):根据日期分区 mouth(event_time):根据月分区 year(event_time):根据年分区 day(event_time):根据天分区 bucket(event_time, 10):分成10个桶 identity(event_time):根据event_time分区 truncate(event_time, 5):根据event_time前5位的宽度分区
当然如果你觉得这些都不够用, 那么你还可以自己写一个Transform 接口的实现来指定分区策略。
在隐式分区技术的基础上,Iceberg实现了Partition Evolution,这个功能可以让上层的查询语句不需要做任何的更新,仍然可以无缝的使用分区过滤功能。这非常关键,例如原先你的数据是按照日期进行分区,随着数据不断增长原先分区里的数据越来越多,分区过滤后数据还是很多,现在想换下分区策略,改成按小时分区。原先的一个查询SQL:
SELECT level, count(1) as countFROM logsWHERE event_timeBETWEEN '2018-12-01 10:00:00' AND '2018-12-01 12:00:00'AND event_date = '2018-12-01'
如果是不使用Iceberg, 那么正常的变更分区方式可能需要更新表,重新添加一个虚拟列hour作为分区列。同时需要添加一个条件语句是 and hour = '10', 如果上层查询语句没有更新,那么分区过滤也无效。
如果使用Iceberg,可以用下面代码更新下分区策略:
//可以通过 table = (BaseTable)origTable的方式获取内部table实例TableMetadata current = table.operations().current();PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()).hour("event_time").withSpecId(1).build();table.ops().commit(current, current.updatePartitionSpec(newSpec));
更新好策略之后,不需要做任何的数据迁移,上层查询语句不需要做任何变化即可直接用上分区过滤功能。是不是节省了很多事
Schema Evolution内核剖析
与Delta Lake和Hudi不同,Iceberg有自己独立定义的Schema,它定义了field id, field name到NestedField
的映射,同时还定义了一系列的visitor用于访问和更新Schema。通过这套独立的Schema逻辑以及一系列visitor,Iceberg Schema可以不用和Spark 的Schema以及底层的文件格式Schema耦合,从而实现 full schema evolution。
丰富的Schema Visitor
Iceberg为方便对Schema操作以及与引擎和底层文件格式之间的转换,定义了一系列的Visitor。
与Spark Type转换的Visitor
上面[Iceberg Schema Evolution](如何使用Iceberg Schema Evolution)的使用中我们定义了一个典型的Iceberg Schema, 例子中我们可以看出Iceberg Schema的定义和Parquet表的定义很类似,支持嵌套类型array, list, map,它可以很容易的和Spark的StructType进行转换:
//从Spark表获取Schema并转成Iceberg SchemaSchema schema = SparkSchemaUtil.schemaForTable(sparkSession, tableName);//从Spark表获取Partition信息,转成Iceberg PartitionSpecPartitionSpec spc = SparkSchemaUtil.specForTable(spark, tableName);// Spark StructType和Iceberg Schema互相Schema schema = SparkSchemaUtil.convert(structType);StructType struct = SparkSchemaUtil.convert(schema);
这里主要得益于两个Schema Visitor, TypeToSparkType 和SparkTypeToType, 这两个visitor可以对已有的表结构进行深度遍历,并在遍历同时生成另外一种格式的表结构。我们看下其中一个的具体visit方法:
static <T> T visit(DataType type, SparkTypeVisitor<T> visitor) {if (type instanceof StructType) {StructField[] fields = ((StructType) type).fields();List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);for (StructField field : fields) {fieldResults.add(visitor.field(field,visit(field.dataType(), visitor)));}return visitor.struct((StructType) type, fieldResults);} else if (type instanceof MapType) {return visitor.map((MapType) type,visit(((MapType) type).keyType(), visitor),visit(((MapType) type).valueType(), visitor));} else if (type instanceof ArrayType) {return visitor.array((ArrayType) type,visit(((ArrayType) type).elementType(), visitor));} else if (type instanceof UserDefinedType) {throw new UnsupportedOperationException("User-defined types are not supported");} else {return visitor.atomic(type);}}
这是典型的visitor模式,通过深度遍历Schema 访问内部结构构建想要的结果。Schema相关的许多操作都是通过Visitor模式完成。例如分配ID, Schema更新。
与Avro和Parquet 类型转换的 Visitor
与Parquet MessageType互相转换的Visitor: MessageTypeToType和TypeToMessageType
与Avro Schema互相转换的Visitor : TypeToSchema和SchemaToType
分配Field ID的Visitor
分配由AssignFreshIds这个visitor来完成,AssignFreshIds负责对一个Type分配新的Id, 由于Type是一个嵌套类型,所以需要由visitor来遍历。值得注意的,AssignFreshId分配id的方式并不是先序深度遍历,它在访问节点时先分配了当前所有子节点id,然后将子节点的访问作为Future保存,这样就有点类似广度优先。这一点可以从我们上面的示例Schema中看出。
更新Schema的Visitor
Schema更新由ApplyChanges这个Schema Visitor来完成,它包含了新增列,删除列,更新列三个操作,这三个更新操作分别会生成新的列的集合如下:
新增列:用一个MultiMap<Integer, NestedField> adds表示,key是parent field, value是新增的filed集合 删除列:用一个Set deletes表示 更新列:用一个Map<Integer, NestedField> updates表示
当用户需要commit这些更新时,ApplyChanges的在遍历当前Schema的时候会分别查询下当前的field的id是否在adds, deletes, updates三个集合中,如果正好在的话就会进行相应的处理,最后ApplyChanges会生成一个新的Schema文件,值得注意的是,那些没有变化的field它们的field ID也没有变化。这就保证了一定的兼容性。
总结
Schema Evolution是新一代数据湖必备的技能,不管是Iceberg, Delta Lake, Hudi都宣称自己有Schema Evolution的功能,但只有Iceberg真正做到full schema evolution。Iceberg的亮点在于它把Schema的逻辑独立抽象出来,使得Schema和Partition与上层引擎和底层的文件格式解耦做到更强大的功能。
欢迎阅读其他Iceberg系列文章
欢迎关注“数据湖技术”公众号,更多精彩待续!





