1、概览
1.1 功能
Flink Table API 和 SQL 使用户能够使用函数进行数据转换。
1.2 函数类型
Flink 中的功能分类有两个维度。
一维是系统(或内置)函数与目录函数。系统函数没有命名空间,只能用它们的名字来引用。目录函数属于目录和数据库,因此它们具有目录和数据库命名空间,可以通过完全/部分限定名 (catalog.db.func
或db.func
) 或仅通过函数名来引用它们。
另一个维度是临时函数与持久函数。临时函数是不稳定的,只能在会话的生命周期内使用,它们始终由用户创建。持久函数存在于会话的整个生命周期中,它们要么由系统提供,要么持久存在于目录中。
这两个维度为 Flink 用户提供了 4 类功能:
临时系统功能
系统功能
临时目录功能
目录功能
1.3 引用函数
用户可以通过两种方式在 Flink 中引用函数——精确引用函数或模糊引用函数。
1.3.1 精确函数参考
精确的函数引用使用户能够特定地、跨目录和跨数据库使用目录函数,例如select mycatalog.mydb.myfunc(x) from mytable
和select mydb.myfunc(x) from mytable
。
这仅从 Flink 1.10 开始支持。
1.3.2 模糊函数参考
在不明确的函数引用中,用户只需在 SQL 查询中指定函数的名称,例如select myfunc(x) from mytable
.
1.4 函数解析顺序
只有当有不同类型但相同名称的函数时,解析顺序才重要,例如当有三个函数都命名为“myfunc”但分别属于临时目录、目录和系统函数时。如果没有函数名冲突,函数只会被解析为唯一的。
1.4.1 精确函数参考
因为系统函数没有命名空间,所以 Flink 中精确的函数引用必须指向临时目录函数或目录函数。
解析顺序为:
临时目录功能
目录功能
1.4.2 模糊函数参考
解析顺序为:
临时系统功能
系统功能
临时目录功能,在session的当前目录和当前数据库中
目录函数,在会话的当前目录和当前数据库中
2、系统(内置)函数
Flink Table API & SQL 为用户提供了一组内置的数据转换函数。本页简要介绍了它们。如果您需要的功能尚不支持,您可以实现用户自定义功能。如果你觉得这个功能够通用,请为它打开一个 Jira issue并详细说明。
3、标量函数
标量函数将零、一个或多个值作为输入并返回单个值作为结果。
3.1 比较函数
| = | > | < |
|---|---|---|
| <> | >= | <= |
| is null | like | similar to |
| is not null | not like(未开通) | not similar to(未开通) |
| in | not in(未开通) |
很多的比较函数都是不能进行使用的,所以在使用的时候一定要注意,一定要参照一下官网。。。
3.2 逻辑函数
| OR | AND |
|---|---|
| IS FALSE | IS TRUE |
| IS NOT FALSE | IS NOT TRUE |
| NOT |
逻辑函数多数都是可以进行使用的
3.3 算术函数
| + | * |
|---|---|
| - | / |
| % | POWER() |
| ABS() | SQRT() |
| LN() | LOG(numeric2) |
| LOG10() | LOG(numeric1, numeric2) |
| LOG2() | EXP() |
| CEIL() | FLOOR() |
| CEILING() | SIN() |
| SINH() | COS() |
| TAN() | TANH() |
| COT() | ASIN() |
| ACOS() | ATAN() |
| ATAN2() | COSH() |
| DEGREES() | RADIANS() |
| SIGN() | ROUND() |
| PI() | E() |
| RAND() | RAND(INT) |
| RAND_INTEGER(INT) | RAND_INTEGER(INT1, INT2) |
| UUID() | BIN(INT) |
| HEX() | TRUNCATE() |
标成红色的部分说明是最常用的
3.4 字符串函数
| str1 ||str2 | char_length(str1 )、character_length(str1 ) |
|---|---|
| upper(str1 ) | lower(str1 ) |
| position(str1 in str2) | trim() |
ltrim(string)
rtrim(string)
repeat(string, int)
regexp_replace(string1, string2, string3)
overlay(string1 placing string2 from integer1 [ for integer2 ])
substring(string from integer1 [ for integer2 ])
replace(string1, string2, string3)
regexp_extract(string1, string2[, integer])
initcap(string)
concat(string1, string2,...)
concat_ws(string1, string2, string3,...)
lpad(string1, integer, string2)
rpad(string1, integer, string2)
from_base64(string)
to_base64(string)
ascii(string)
chr(integer)
decode(binary, string)
instr(string1, string2)
left(string, integer)
right(string, integer)
locate(string1, string2[, integer])
parse_url(string1, string2[, string3])
regexp(string1, string2)
reverse(string)
split_index(string1, string2, integer1)
str_to_map(string1[, string2, string3]])
substr(string[, integer1[, integer2]])3.5 时间函数
https://helpcdn.aliyun.com/document_detail/62717.html
| 时间函数 | 用法 | 描述 |
|---|---|---|
| date string | select date '2021-10-07'; | 以“yyyy-MM-dd”的形式返回从字符串解析的 SQL 日期。 |
| time string | select time '08:52:00'; | 以“HH:mm:ss”的形式返回从字符串解析的 SQL 时间。 |
| timestamp string | select timestamp '2021-10-06 12:12:12.111'; | 以“yyyy-MM-dd HH:mm:ss[.SSS]”的形式返回 从字符串解析的 SQL 时间戳。 |
| interval string range | select interval '10 00:00:00.004' day to second; | 解析格式为 "dd hh:mm:ss.fff" 的间隔字符串, 用于毫秒的 SQL 间隔或 "yyyy-mm" 的格式为 月的 SQL 间隔。间隔范围可能是 DAY、MINUTE、 DAY TO HOUR 或 DAY TO SECOND,以毫秒为间隔; YEAR 或 YEAR TO MONTH 表示几个月的间隔。 例如,INTERVAL '10 00:00:00.004' DAY TO SECOND、 INTERVAL '10' DAY 或 INTERVAL '2-10' YEAR TO MONTH 返回间隔。 |
| year(date) | select year(date '2021-10-16'); | 从SQL日期返回年份。相当于从日期中提取(年)。 例如,year(date '1994-09-27')返回1994。 |
| localtime | select localtime; | 返回本地时区的当前 SQL 时间,返回类型为 TIME(0)。 在流模式下对每条记录进行评估。但在批处理模式下, 它在查询开始时被评估一次,并对每一行使用相同的结果。 |
| localtimestamp | select localtimestamp; | 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP(3)。 在流模式下对每条记录进行评估。但在批处理模式下, 它在查询开始时被评估一次,并对每一行使用相同的结果。 |
| current_time | select current_time; | 返回本地时区的当前 SQL 时间,这是 LOCAL_TIME 的同义词。 |
| current_date | select current_date; | 返回本地时区中的当前SQL日期。 在流模式下对每个记录进行评估。 但是在批处理模式下,在查询开始时计算一次, 并对每一行使用相同的结果。 |
| current_timestamp | select current_timestamp; | 返回本地时区的当前SQL时间戳, 返回类型为TIMESTAMP_LTZ(3)。 在流模式下对每个记录进行评估。 但是在批处理模式下,在查询开始时计算一次, 并对每一行使用相同的结果。 |
| now() | select now(); | 返回本地时区的当前SQL时间戳,这是CURRENT_TIMESTAMP的同义词。 |
| current_row_timestamp() | select current_row_timestamp(); | 返回本地时区的当前sql时间戳,这是current_timestamp的同义词。 返回本地时区的当前sql时间戳,返回类型为timestamp_ltz(3)。 无论是批处理还是流模式,都会对每个记录进行评估。 |
| extract(timeinteravlunit from temporal) | select extract(day from date '2006-06-05'); | 返回从temporal的timeintervalunit部分提取的长值。 例如,extract(day from date '2006-06-05')返回5。 |
| quarter(date) | select quarter(date '1994-09-27'); | 从sql date日期返回每季度(1到4之间的整数)。 相当于extract(quarter from date)。 例如,quarter(date '1994-09-27') |
| month(date) | select month(date '1994-09-27'); | 从sql date返回一年中的月份(1到12之间的整数)。 相当于extract(month from date)。 例如,month(date '1994-09-27')返回9。 |
| week(date) | select week(date '1994-09-27'); | 从sql date返回一年中的月份(1到12之间的整数)。 从sql date返回一年中的星期(1到53之间的整数)。 相当于extract(从日期开始的一周)。 例如:week(date '1994-09-27')返回39。 相当于extract(month from date)。 例如,month(date '1994-09-27')返回9。 |
| dayofyear(date) | select dayofyear(date '1994-09-27'); | 从sql日期返回一年中的日期(1到366之间的整数)。 相当于extract(doy from date)。 例如,dayofyear(日期'1994-09-27')返回270。 |
| dayofmonth | select extract(day from date '1994-09-27'); select dayofweek(date '1994-09-27'); | 从sql date返回一个月中的哪一天(1到31之间的整数)。 相当于extract(day from date)。 例如,dayofweek(日期'1994-09-27')返回3。 |
| hour(timestamp) | select hour(timestamp '1994-09-27 13:14:15'); | 从sql时间戳中返回一天中的小时(0到23之间的整数)。 相当于extract(hour from timestamp)。 例如,minute(timestamp '1994-09-27 13:14:15')返回14。 |
| minute(timestamp) | select minute(timestamp '1994-09-27 13:14:15'); | 从sql时间戳中返回每小时的分钟(0到59之间的整数)。 相当于extract(minute from timestamp)。 例如,minute(timestamp '1994-09-27 13:14:15')返回14。 |
| second(timestamp) | select second(timestamp '1994-09-27 13:14:15'); | 从sql时间戳返回一分钟中的秒(0到59之间的整数)。 相当于extract(second from timestamp)。 例如,second(timestamp '1994-09-27 13:14:15')返回15。 |
| floor(timepoint to timeintervalunit) | select floor(time '12:44:31' to minute); | 返回一个值,该值将时间点舍入到时间单元timeintervalunit 例如,floor(time '12:44:31' to minute) 返回12:44:00。 |
| ceil(timepoint to timeintervaluntit) | select ceil(time '12:44:31' to minute); | 返回一个值,该值将时间点舍入到时间单元timeintervalunit 例如,ceil(time '12:44:31' to minute)将于12:45:00。 |
| (timepoint1, temporal1) overlaps (timepoint2, temporal2) | select (time '2:55:00',interval '1' hour) overlaps (time '3:30:00',interval '2' hour); | 如果(timepoint1, temporal1)和(timepoint2, temporal2) 定义的两个时间间隔重叠,则返回true。 时间值可以是时间点,也可以是时间间隔。 例如,(time '2:55:00', interval '1' hour) overlaps (time '3:30:00', interval '2' hour) 返回true;overlaps (time '10:15:00', interval '3' hour)返回false。 |
| date_format(timestamp, string) | select date_format(timestamp '2021-10-06 12:12:12.111','yyyy-MM-dd'); | 将时间戳转换为由日期格式字符串指定的格式的字符串值。 格式字符串与Java的SimpleDateFormat兼容。 |
| timestampadd(timeintervalunit, interval, timepoint) | select timestampadd(hour,3,timestamp '2021-10-06 12:12:12.111'); | 时间的加法根据单位 |
| timestampdiff(timepointunit, timepoint1, timepoint2) | select timestampdiff(hour,timestamp '2021-10-06 12:12:12.111',timestamp '2021-10-06 15:12:12.111'); | 返回timepoint1和timepoint2之间的(带符号的)timepointunit数目。 interval的单位由第一个参数给出,它应该是以下值之一: second、minute、hour、day、month或year。 |
| convert_tz(string1, string2, string3) | select convert_tz('1970-01-01 00:00:00','UTC','America/Los_Angeles'); | 将datetime string1(默认的ISO时间戳格式为'yyyy-MM-dd HH:mm:ss') 从时区string2转换为时区string3。 时区的格式应该是缩写, 如“PST”,全名, 如“America/Los_Angeles”,或者自定义ID, 如“GMT-08:00”。 例如,convert_tz('1970-01-01 00:00:00','UTC','America/Los_Angeles')返回'1969-12-31 16:00:00'。 |
| from_unixtime(numeric[, string]) | select from_unixtime(1505404800) | 以字符串格式返回数字参数的表示形式(默认为'yyyy-MM-dd HH:mm:ss')。 numeric是一个内部时间戳值,表示自'1970-01-01 00:00:00' UTC以来的秒数, 例如UNIX_TIMESTAMP()函数生成的时间戳值。返回值在会话时区中表示 (在tablecconfig中指定)。 例如,FROM_UNIXTIME(44)在UTC时区返回'1970-01-01 00:00:44', 但在'Asia/Tokyo'时区返回'1970-01-01 09:00:44'。 |
| unix_timestamp() | select unix_timestamp(); | 获取当前Unix时间戳(以秒为单位)。这个函数是不确定的,这意味着将为每个记录重新计算值。 |
| unix_timestamp(string1[, string2]) | SELECT UNIX_TIMESTAMP('2021-10-06','yyyy-MM-dd'); SELECT date_format('2021-10-06 20:20:20','yyyy-MM-dd'); | 将日期时间字符串string1的格式为string2 (默认为yyyy-MM-dd HH:mm:ss,如果没有指定,则HH:mm:ss) 转换为Unix时间戳(秒),使用表config中的指定时区。 |
| to_date(string1[, string2]) | select to_date('2021-10-06','yyyy-MM-dd'); | 将日期字符串string1的格式string2(默认为'yyyy-MM-dd')转换为日期。 |
| to_timestamp_ltz(numeric, precision) | select to_timestamp_ltz(1000,3); | 将epoch秒或epoch毫秒转换为TIMESTAMP_LTZ, 有效精度为0或3,0表示TO_TIMESTAMP_LTZ(epochSeconds, 0), 3表示TO_TIMESTAMP_LTZ(epochMilliseconds, 3)。 |
| to_timestamp(string1[, string2]) | SELECT TO_TIMESTAMP('2021-10-06 12:12:12','yyyy-MM-dd HH:mm:ss') | 将会话时区(由tablecconfig指定)下的日期时间字符串string1的格式string2 (默认:'yyyy-MM-dd HH:mm:ss')转换为时间戳。 |
3.5 条件函数
https://helpcdn.aliyun.com/document_detail/62715.html
3.5.1 is_decimal
语法
BOOLEAN IS_DECIMAL(VARCHAR str)| 参数 | 数据类型 |
|---|---|
| str | VARCHAR |
str字符串如果可以转换为十进制数值返回TRUE;否则返回FALSE。
select
is_decimal('1') as boo1,
is_decimal('123') as boo2,
is_decimal('2') as boo3,
is_decimal('11.4445') as boo4,
is_decimal('3') as boo5,
is_decimal('asd') as boo6,
is_decimal('') as boo73.5.2 is_digit
BOOLEAN IS_DIGIT(VARCHAR str)| 参数 | 数据类型 |
|---|---|
| str | VARCHAR |
str中只包含数字则返回true,否则返回false。返回值为BOOLEAN类型。
SELECT
IS_DIGIT(1) as boo1,
IS_DIGIT('abc') as boo2,
IS_DIGIT('') as boo3;3.5.3 is_alpha
BOOLEAN IS_ALPHA(VARCHAR str) | 参数 | 数据类型 |
|---|---|
| str | VARCHAR |
如果str中只包含字母,则返回true,否则返回false。
SELECT
is_alpha(1) as boo1,
is_alpha('abc') as boo2,
is_alpha('') as boo3;3.5.4 if
T if (BOOLEAN testCondition, T valueTrue, T valueFalseOrNull)| 参数 | 数据类型 |
|---|---|
| testCondition | BOOLEAN |
| valueTrue | 可以为任意类型,但valueTrue和valueFalseOrNull类型需保持一致。 |
| valueFalseOrNull | 可以为任意类型,但valueTrue和valueFalseOrNull类型需保持一致。 |
select if(1<2,'正常情况','错误答案');3.5.5 case when
CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END如果表达式a为TRUE,则返回b;如果表达式a为FALSE、c为TRUE,则返回d;如果表达式a和c都为FALSE,则返回e。
CASE WHEN返回常量字符串时,会在字符串后面补全空格。例如,当满足else条件时,返回值ios
后面会多几个空格。
解决方法:
利用TRIM函数去除空格,该示例中,涉及
os
的字段都改为TRIM(os)
。利用CAST函数去除空格,将常量字符串转为VARCHAR类型。
select
case when '123'='123' then '确实相等' else '不相等' end;3.5.6 nullif
NULLIF(A,B)两个参数的值相同,则返回null,值不同则返回第一个参数的值。
select nullif('123','123'); -- null
select nullif('123',''); -- 123
select nullif('','123'); -- 空3.5.7 coalesce
COALESCE(A,B,...)| 参数 | 数据类型 |
|---|---|
| A | 任意数据类型 |
| B | 任意数据类型 |
返回列表中第一个非null的值,返回值类型和参数类型相同。如果列表中所有的值都是null,则返回null。
select coalesce(null,'123');3.6 类型转换函数
3.6.1 cast
返回一个被强制转换为类型类型的新值。例如,CAST('42' AS INT) 返回 42;CAST(NULL AS VARCHAR) 返回 VARCHAR 类型的 NULL。
select cast('123' as int);3.6.2 typeof
返回输入表达式的数据类型的字符串表示形式。默认情况下,返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。如果 force_serializable 设置为 TRUE,则该字符串表示可以保留在目录中的完整数据类型。请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下,返回 NULL。
select typeof(cast('123' as int),true);3.7 集合函数
3.7.1 cardinality(array)
返回数组中元素的个数。
select cardinality(array['hello','aa']);3.7.2 array [INT ]
select array['hello','aa'][1];3.7.3 element(array)
select element(array['hello']);这个之允许数组中只有一个元素,感觉这个没有什么适用的价值
3.7.4 cardinality(map)
select cardinality( map['k1','v1','k2','v2'])3.7.5 map [value ]
select map['k1','v1','k2','v2'];3.8 聚合函数
| 时间函数 | 描述 |
|---|---|
| count() count(columName) count(distinct columName) count(1) count(*) | count() 函数返回匹配指定条件的行数。 COUNT(column_name) 函数返回指定列的值的数目(NULL 不计入) COUNT() 函数返回表中的记录数 COUNT(DISTINCT column_name) 函数返回指定列的不同值的数目 count(1) 最好是使用count(1) 因为大多数情况count(1)都比count()好 |
| rank() | 返回值在一组值中的排名。 结果是 1 加上分区顺序中当前行之前或等于当前行的行数。 这些值将在序列中产生间隙。 |
| dense_rank() | 返回值在一组值中的排名。结果是一加先前分配的等级值。 与函数 rank 不同,dense_rank 不会在排名序列中产生间隙。 |
| row_number() | 根据窗口分区内行的顺序,为每一行分配一个唯一的序列号,从一开始。 ROW_NUMBER 和 RANK 相似。ROW_NUMBER 按顺序对所有行进行编号 (例如 1、2、3、4、5)。 RANK 为平局提供相同的数值(例如 1、2、2、4、5)。 |
| lead | 返回窗口中当前行之后第 offsetth 行处的表达式值。 offset 的默认值为 1,default 的默认值为 NULL。 |
| lag | 返回窗口中当前行之前第 offsetth 行处的表达式值。 offset 的默认值为 1,default 的默认值为 NULL。 |
| first_value | 返回一组有序值中的第一个值。 |
| last_value | 返回一组有序值中的最后一个值。 |
| listagg | 连接字符串表达式的值并在它们之间放置分隔符值。 字符串末尾不添加分隔符。分隔符的默认值为“,”。 |
4、udf
package com.wmy.flink.sql.flinksql.fun.udf;import lombok.*;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;import static org.apache.flink.table.api.Expressions.$;import static org.apache.flink.table.api.Expressions.call;/*** @project_name: flinkDemo* @package_name: com.wmy.flink.sql.flinksql.fun.udf* @Author: wmy* @Date: 2021/10/7* @Major: 数据科学与大数据技术* @Post:大数据实时开发* @Email:wmy_2000@163.com* @Desription: flink UDF 用户案例* @Version: wmy-version-01*/public class Flink_SQL_Fun_UDF_01 {public static void main(String[] args) throws Exception {System.out.println("Flink_SQL_Fun_UDF_01 >>> 2021-10-07");// 创建流式环境和表环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// 读取数据DataStreamSource<String> socketTextStream = env.socketTextStream("flink01", 8888);// 对数据进行转换SingleOutputStreamOperator<User> waterSensorDS = socketTextStream.map(new MapFunction<String, User>() {@Overridepublic User map(String value) throws Exception {String[] fields = value.split(",");return User.builder().id(fields[0]).name(fields[1]).age(Integer.parseInt(fields[2])).build();}});//3.将流转换为动态表Table table = tableEnv.fromDataStream(waterSensorDS);// 不注册就进行使用//table.select($("id"), $("name"), call(UserNameLength.class, $("name")).as("nameLength"), $("age")).execute().print();// 注册在进行使用tableEnv.createTemporarySystemFunction("nameLength", UserNameLength.class);tableEnv.sqlQuery("select id,name,age,nameLength(name) as nameLength from " + table).execute().print();// 执行流式环境env.execute("Flink_SQL_Fun_UDF_01 >>> 2021-10-07");}// define function logicpublic static class UserNameLength extends ScalarFunction {public int eval(String value) {return value.length();}}@Data@NoArgsConstructor@AllArgsConstructor@Builder@ToStringpublic static class User {private String id;private String name;private int age;}}
欢迎大家关注我的微信公众号
我会每天分享我的学习笔记和资料,大家也可以多多留言,共同进步。




