本期项目经验分享来自乔奥同学,承担的项目是【PolarDB-X 支持 UDF】。

一、About
关于 PolarDB 开源社区
PolarDB 开源社区是阿里云数据库开源产品 PolarDB 的技术交流平台。作为一款开源的数据库产品, 离不开用户和开发者的支持, 大家可以在社区针对 PolarDB 产品提问题、功能需求、交流使用心得、分享最佳实践、提交 issue、贡献代码等。
PolarDB-X 是一款面向超高并发、海量存储、复杂查询场景设计的云原生分布式数据库系统。其采用 Shared-nothing 与存储计算分离架构,支持水平扩展、分布式事务、混合负载等能力,具备企业级、云原生、高可用、高度兼容 MySQL 系统及生态等特点。
PolarDB-X 最初为解决阿里巴巴天猫“双十一”核心交易系统数据库扩展性瓶颈而生,之后伴随阿里云一路成长,是一款经过多种核心业务场景验证的、成熟稳定的数据库系统。
PolarDB-X 是阿里巴巴自主设计研发的高性能云原生分布式数据库产品,为用户提供高吞吐、大存储、低延时、易扩展和超高可用的云时代数据库服务。PolarDB-X 始终保持对阿里巴巴集团“双十一购物狂欢节”所有相关业务的全面支撑。历经十余年淬炼,PolarDB-X 具备了强数据一致性、高系统稳定性、快速集群弹性等核心关键特性,并在司法财税、交通物流、电力能源等公共事业领域有广泛深入应用。PolarDB-X 坚定遵循自主可控、开放生态的发展思路,持续围绕 MySQL 开源生态构建分布式能力,以求最大程度降低用户的学习使用成本。
作为全球数据库领导者,阿里云数据库坚定拥抱开源,多年来积极参与开源社区建设,为 MySQL、PostgreSQL 等社区做过多项贡献。2021 年,阿里云把数据库开源作为重要战略方向,正式开源自研核心数据库产品 PolarDB,助力开发者和客户通过开源版本快速使用阿里云数据库产品技术,并参与到技术产品的迭代过程中来。2021 年 5 月,阿里云率先开源 PolarDB for PostgreSQL 分布式版,在 10 月的云栖大会上,阿里云进一步开源了云原生分布式数据库 PolarDB-X 和 PolarDB for PostgreSQL 共享存储版,聚合社区力量,繁荣云原生分布式数据库生态,服务广大开发者,推动技术变革。
2022 年 10 月份,PolarDB-X 正式发布 2.2.0 版本,这是一个重要的里程碑版本,重点推出符合分布式数据库金融标准下的企业级和国产 ARM 适配,共包括八大核心特性,全面提升 PolarDB-X 分布式数据库在金融、通讯、政务等行业的普适性。
项目介绍
项目编号:2209e0087
项目链接:https://summer-ospp.ac.cn/#/org/prodetail/2209e0087
项目导师:玄弟
·支持系统视图,可以方便查询和管理已注册 UDF
二、项目开发方案描述
方案介绍
UDF 全称用户自定义函数,用于扩展数据库中的系统内置函数,便于用户实现自定义功能。本文将从如下几个方面对 PolarDB-X 中的 Java UDF 设计实现方案进行介绍。
UDF 在其他常见数据库中的实现
Snowflake
create or replace function concat_varchar_2(a ARRAY)
returns varchar
language java
handler='TestFunc_2.concatVarchar2'
target_path='@~/TestFunc_2.jar'
as
$$
class TestFunc_2 {
public static String concatVarchar2(String[] strings) {
return String.join(" ", strings);
}
}
$$;- 无需继承,自定义类和方法即可
- 需指定 handler 以及目标路径
MaxCompute
代码嵌入式 UDF:
CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
public String evaluate(String input) {
if (input == null) return null;
StringBuilder ret = new StringBuilder();
for (int i = input.toCharArray().length - 1; i >= 0; i--) {
ret.append(input.toCharArray()[i]);
}
return ret.toString();
}
}
#END CODE;
SELECT foo('abdc');- 嵌入式代码块可以置于`USING`后或脚本末尾,置于`USING`后的代码块作用域仅为`CREATE TEMPORARY FUNCTION`语句。
- `CREATE TEMPORARY FUNCTION`创建的函数为临时函数,仅在本次执行生效,不会存入 MaxCompute 的 Meta 系统。
HIVE
hive> add jar /home/data/HiveUDF.jar;
hive> create temporary function zodiac as 'mastercom.hive.udf.ZodiacUDF'; #注册临时函数
hive> create function hive.zodiac as 'mastercom.hive.udf.ZodiacUDF' using jar 'hdfs://192.168.1.101:8020/script/HiveUDF.jar'; #永久注册函数- 用户代码需要继承 UDF 类。
- 实现 evaluate()方法,UDF 实现的功能在 evaluate 里实现。Hive 根据类名来创建 UDF,调用的时候根据 evaluate 参数来调用不同的方法,实现不同的功能。
StarRocks
CREATE FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
properties (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);- 用户自定义类必须实现 evaluate 方法
- 无需继承
- 需将 jar 包上传至 FE 和 BE 能访问的 HTTP 服务器,并且 HTTP 服务需要一直开启。FE 会对 UDF 所在 Jar 包进行校验并计算校验值,BE 会下载 UDF 所在 Jar 包并执行。
FLINK
CREATE FUNCTION to_json AS 'com.shuyun.dpe.flink.udf.ToJsonFunction';- 继承 ScalarFunction,实现 getName()和 eval()
- 将打包好的 jar 放到 flink 集群的 lib 目录下,重启 flink 集群
- 声明函数
PolarDB-X JAVA UDF 语法设计
创建 UDF
CREATE FUNCTION <函数名> RETURNTYPE <返回值类型> INPUTTYPE <输入类型1, 输入类型2...>
IMPORT <引用声明> ENDIMPORT
CODE
public Object compute(<T> args1, <T> args2, ...) {
<用户函数代码>
}
ENDCODE删除 UDFDROP FUNCTION [IF EXISTS] <函数名>显示所有 UDF
SHOW FUNCTION;PolarDB-X JAVA UDF执行流程设计
在 PolarDB-X 中,存在一些系统内置的扩展函数,例如 Sin() 函数、Cos() 函数、Sub() 函数等等,这些函数由`ExtraFunctionManager`进行统一管理,并将对应的构造函数保存在缓存中,在执行过程中根据函数名进行相应的调用。
系统内置函数的执行流程
加载过程:

一个扩展函数需要继承 AbstractScalarFunction,并实现 compute()以及 getFunctionNames()函数以便后续的统一调用。在 PolarDB-X 的初始化阶段,
CobarServer.warmup()触发
ExtraFunctionManager.getExtraFunction("warmup", null, null);来加载 ExtraFunctionManager 类。
ExtraFunctionManager 中的静态方法 initFunctions() 将会做以下两件事:
-自动扫描 IExtraFunction 对应 Package 目录下的所有 Function 实现
-自动扫描 Extension 扩展方式下的自定义实现,比如在 META-INF/tddl 或 META-INF/services 添加扩展配置文件并通过 addFunction() 函数来进行函数签名的注册工作。在函数的执行过程中,使用了一个 HashMap 来储存函数签名以及对应的构造器,定义如下:
private static Map<FunctionSignature, Constructor<?>> functionCaches = new HashMap
调用过程:

Java UDF 的执行流程
加载过程:

类似的,我们为所有 UDF 定义了一个父类 UserDefinedJavaFunction,其继承了AbstractScalarFunction。用户只需实现简化后的 compute() 方法即可,降低了使用成本。我们采用 UserDefinedJavaFunctionManager 来管理所有的 Java UDF。
在 PolarDB-X 的初始化阶段,
CobarServer.warmup()触发
UserDefinedJavaFunctionManager.getExtraFunction("warmup", null, null);来加载UserDefinedJavaFunctionManager类。
UserDefinedJavaFunctionManager 中的静态方法将会在对已经进行持久化的函数进行加载和注册,并通过 addFunction() 函数来进行函数签名的注册工作。这里我们同样适用了一个 HashMap 来存储函数名以及构造函数,定义如下:
private static Map<String, Constructor<?>> javaFunctionCaches = new HashMap<>();
调用过程:

PolarDB-X Java UDF创建及删除过程设计
创建过程:

对于用户输入的 UDF 创建语句,polardb-x 会对其进行解析,以获得后续执行所需的函数名、输入输出类型、引用声明以及用户逻辑代码。接着,我们会将这些关键参数进行拼接,以生成一个字符串形式的类文件,类名对应函数名。该类继承内部定义的 UserDefinedJavaFunction ,并实现了相应的 compute() 以及 getFunctionName() 方法。接着通过使用开源框架 Janino 的相应 API,把该类编译并加载到 JVM 中。这里我们使用了一个 Map 来存储函数名以及其构造方法,方便在后续调用过程中进行调用
public static Map<String, Constructor<?>> javaFunctionCaches = new HashMap<>();最后,将其注册到相应的运算符表中,防止解析过程中无法识别函数关键字。在创建过程中我们将进行持久化的工作。
删除过程:
查询过程:
类型拦截:
在 polardb-x 中,定义了很多内部类型,而在函数的计算过程中,经常会使用到内部自定义的类型来进行计算。但是在 Java UDF 的创建过程中,用户并不了解会用到有哪些内部类型,同时也不能将这些内部类型暴露给用户。因此需要在 Java UDF 的创建过程以及执行过程中对内部类型进行拦截,并转化成为用户期望且已知的数据类型(即 Java 类型)。具体实现过程如下图所示:

函数持久化
对于用户来说,一般更希望自己注册的函数能够在 polardb-x 重启后仍然存在于系统中,直至手动删除。因此,对于用户函数的持久化工作十分重要。我们使用 polardb-x 存储重要信息的 MetaDB 实现持久化工作,将用户定义的函数存储在 MetaDB 中,避免丢失。表结构具体如下:
create table if not exists `user_defined_java_code` (
`func_name` varchar(100) not null,
`class_name` varchar(100) not null,
`code` text not null,
`code_language` varchar(100) not null,
`input_types` varchar(200) not null,
`result_type` varchar(100) not null,
primary key(`func_name`)
) charset=utf8节点间同步

polardb-x 常常被用于集群化部署,如果用户在某一个节点上创建了一个 UDF,那么其他节点上也应该能够使用这个 UDF。这里通过使用节点间的通讯的手段来实现。当一个节点上创建了 UDF 时,将会通过 SyncManagerHelper.sync() 方法将函数名传递给其他节点。其他节点获取到相应函数名以后,在 MetaDB 中搜索相应表记录并加载注册。同样的道理,当某个节点删除了一个 UDF 时,函数名同样会传递给其他节点,以便进行删除操作。
三、随访
参与开源之夏
ospp:你的实习经历很丰富,请问这对于你此次所参与的项目有什么帮助吗?
乔奥:实习比较大程度上提升了我的编码能力以及解决问题的思考能力,因为在参加这个活动之前,我没有了解过 UDF 的相关知识,也没有接触过数据库内核的开发,需要我在比较短的时间内了解这些领域,并尝试给出一个初步的解决方案,这对我来说还是很大的挑战,因此我在实习过程中养成的学习能力发挥了比较大的作用。尤其是我的第二段实习,主要在做数据中间件,接触到了几十种数据库领域的很多知名产品,这也很大程度上帮助我理解了 PolarDB-X。
参与开源社区
ospp:介绍一下你眼中的 PolarDB 开源社区吧。
乔奥:PolarDB-X 虽然开源的时间不算很长,但是社区内部还是非常活跃的,可以看到官方每隔一段时间都会合并一部分内部代码进入开源项目中。同时虽然贡献者没有顶级开源项目那么多,但是基本上每个 issue 都有问必答,社区的 commiter 们都非常积极,各个贡献者们也在努力地发挥自己的一份力。
ospp:在项目进行过程中有没有印象深刻的经历?社区和导师为你提供了怎样的帮助?
乔奥:在社区的三个月以来,我从只能提交一些拼写错误 pr,到输出社区 debug-CN 文档,再到掌握系统中函数执行流程,输出自己的解决方案并实现,离不开两位老师的循循善诱和耐心指导。例如仅在实现用户函数接口这一环节,前后经历了三次设计变动。在设计初始,我并没有考虑到该接口涉及到很多内部数据类型,因此简单的认为用户可以直接操作这些数据类型,但在简单测试以后便发现了其中的问题;在越寒老师的指导下,我设计了第二版方案,即对用户函数的输入进行拦截,将传递给用户的函数参数都提前进行类型转化,转为 java 的常用类型,此时用户实现的方法为`Object compute(Object[] args)`;此时虽然解决了数据类型的问题,但输入参数类型全变成了 Object 类,用户使用起来相当繁琐,在玄弟老师的指导下,我对这部分进行了进一步的优化,通过使用反射的方式,实现对于用户实现的函数的获取,此时用户只需要编写签名为`Object compute(<T> args1, <T> args2, ...)`即可,大大简化了用户的操作。
ospp:在本次开源之夏项目成功结项后,你有继续投入到开源社区后续发展的打算吗?
乔奥:当然。非常感谢开源之夏给了我这个机会,能够参与到 PolarDB 这么大的社区里来,这对我来说弥足珍贵。实际上,就在上周我又帮助社区修复了一个 bug,并提交了 pull request,同时我也经常性地关注社区中的 issue,看看有什么是我能为社区做的事情。能够为社区做出贡献、和社区内的各位贡献者交流互助是一件很令我快乐的事情。
收获与寄语
ospp:开源之夏旨在培养和发掘更多优秀的开发者,通过这次的活动你有什么意想不到的收获和成长吗?
乔奥:经过这次开源之夏的活动,我更深一步涉猎了数据库开发这一领域,也激发了我对于基础技术研究和开发的兴趣。最近我也在持续的关注这一领域的论文和产品,并将自己研究生阶段的研究方向确定为了数据库索引相关的方向。以我目前的接触来看,我觉得学习这一领域的关键在于多看和多读:多看一些概念性、逻辑性的描述,掌握这个领域的术语和基础知识,然后就应该具体得看一看各个产品的实现代码,才能参透其中的内涵。作为一个开发者而言,对代码的探索应该是放在第一位的,但同时也要注重自己的文字表达和画图表达能力,这些也很重要。
ospp:对于爱好开源的学弟学妹你有什么想对他们说的吗?




