暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL 函数

骚明的大数据之旅 2021-10-08
4406

1、概览

1.1 功能

Flink Table API 和 SQL 使用户能够使用函数进行数据转换。

1.2 函数类型

Flink 中的功能分类有两个维度。

一维是系统(或内置)函数与目录函数。系统函数没有命名空间,只能用它们的名字来引用。目录函数属于目录和数据库,因此它们具有目录和数据库命名空间,可以通过完全/部分限定名 (catalog.db.func
db.func
) 或仅通过函数名来引用它们。

另一个维度是临时函数与持久函数。临时函数是不稳定的,只能在会话的生命周期内使用,它们始终由用户创建。持久函数存在于会话的整个生命周期中,它们要么由系统提供,要么持久存在于目录中。

这两个维度为 Flink 用户提供了 4 类功能:

  1. 临时系统功能

  2. 系统功能

  3. 临时目录功能

  4. 目录功能

1.3 引用函数

用户可以通过两种方式在 Flink 中引用函数——精确引用函数或模糊引用函数。

1.3.1 精确函数参考

精确的函数引用使用户能够特定地、跨目录和跨数据库使用目录函数,例如select mycatalog.mydb.myfunc(x) from mytable
select mydb.myfunc(x) from mytable

这仅从 Flink 1.10 开始支持。

1.3.2 模糊函数参考

在不明确的函数引用中,用户只需在 SQL 查询中指定函数的名称,例如select myfunc(x) from mytable
.

1.4 函数解析顺序

只有当有不同类型但相同名称的函数时,解析顺序才重要,例如当有三个函数都命名为“myfunc”但分别属于临时目录、目录和系统函数时。如果没有函数名冲突,函数只会被解析为唯一的。

1.4.1 精确函数参考

因为系统函数没有命名空间,所以 Flink 中精确的函数引用必须指向临时目录函数或目录函数。

解析顺序为:

  1. 临时目录功能

  2. 目录功能

1.4.2 模糊函数参考

解析顺序为:

  1. 临时系统功能

  2. 系统功能

  3. 临时目录功能,在session的当前目录和当前数据库中

  4. 目录函数,在会话的当前目录和当前数据库中

2、系统(内置)函数

Flink Table API & SQL 为用户提供了一组内置的数据转换函数。本页简要介绍了它们。如果您需要的功能尚不支持,您可以实现用户自定义功能。如果你觉得这个功能够通用,请为它打开一个 Jira issue并详细说明。

3、标量函数

标量函数将零、一个或多个值作为输入并返回单个值作为结果。

3.1 比较函数

=> <
<>>=<=
is nulllikesimilar to
is not nullnot like(未开通)not similar to(未开通)
innot in(未开通)

很多的比较函数都是不能进行使用的,所以在使用的时候一定要注意,一定要参照一下官网。。。

3.2 逻辑函数

OR AND
IS FALSEIS TRUE
IS NOT FALSEIS NOT TRUE
NOT

逻辑函数多数都是可以进行使用的

3.3 算术函数

+*
-/
%POWER()
ABS()SQRT()
LN()LOG(numeric2)
LOG10()LOG(numeric1, numeric2)
LOG2()EXP()
CEIL()FLOOR()
CEILING()SIN()
SINH()COS()
TAN()TANH()
COT()ASIN()
ACOS()ATAN()
ATAN2()COSH()
DEGREES()RADIANS()
SIGN()ROUND()
PI()E()
RAND()RAND(INT)
RAND_INTEGER(INT)RAND_INTEGER(INT1, INT2)
UUID()BIN(INT)
HEX()TRUNCATE()

标成红色的部分说明是最常用的

3.4 字符串函数

str1 ||str2char_length(str1 )、character_length(str1 )
upper(str1 )lower(str1 )
position(str1 in str2)trim()
ltrim(string)
rtrim(string)
repeat(string, int)
regexp_replace(string1, string2, string3)
overlay(string1 placing string2 from integer1 [ for integer2 ])
substring(string from integer1 [ for integer2 ])
replace(string1, string2, string3)
regexp_extract(string1, string2[, integer])
initcap(string)
concat(string1, string2,...)
concat_ws(string1, string2, string3,...)
lpad(string1, integer, string2)
rpad(string1, integer, string2)
from_base64(string)
to_base64(string)
ascii(string)
chr(integer)
decode(binary, string)
instr(string1, string2)
left(string, integer)
right(string, integer)
locate(string1, string2[, integer])
parse_url(string1, string2[, string3])
regexp(string1, string2)
reverse(string)
split_index(string1, string2, integer1)
str_to_map(string1[, string2, string3]])
substr(string[, integer1[, integer2]])

3.5 时间函数

https://helpcdn.aliyun.com/document_detail/62717.html

时间函数用法描述
date stringselect date '2021-10-07';以“yyyy-MM-dd”的形式返回从字符串解析的 SQL 日期。
time stringselect time '08:52:00';以“HH:mm:ss”的形式返回从字符串解析的 SQL 时间。
timestamp stringselect timestamp '2021-10-06 12:12:12.111';以“yyyy-MM-dd  HH:mm:ss[.SSS]”的形式返回     从字符串解析的 SQL 时间戳。
interval string rangeselect interval '10 00:00:00.004' day to second;解析格式为 "dd  hh:mm:ss.fff" 的间隔字符串,     用于毫秒的 SQL 间隔或 "yyyy-mm" 的格式为     月的 SQL 间隔。间隔范围可能是 DAY、MINUTE、     DAY TO HOUR 或 DAY TO SECOND,以毫秒为间隔;    YEAR 或 YEAR TO MONTH 表示几个月的间隔。    例如,INTERVAL '10 00:00:00.004' DAY TO SECOND、     INTERVAL '10' DAY 或 INTERVAL '2-10'      YEAR TO MONTH 返回间隔。
year(date) select year(date '2021-10-16');从SQL日期返回年份。相当于从日期中提取(年)。    例如,year(date '1994-09-27')返回1994。
localtimeselect localtime;返回本地时区的当前 SQL 时间,返回类型为  TIME(0)。    在流模式下对每条记录进行评估。但在批处理模式下,     它在查询开始时被评估一次,并对每一行使用相同的结果。
localtimestampselect localtimestamp;返回本地时区的当前 SQL 时间戳,返回类型为  TIMESTAMP(3)。    在流模式下对每条记录进行评估。但在批处理模式下,     它在查询开始时被评估一次,并对每一行使用相同的结果。
current_timeselect current_time;返回本地时区的当前 SQL 时间,这是 LOCAL_TIME 的同义词。
current_dateselect current_date;返回本地时区中的当前SQL日期。    在流模式下对每个记录进行评估。    但是在批处理模式下,在查询开始时计算一次,     并对每一行使用相同的结果。
current_timestampselect current_timestamp;返回本地时区的当前SQL时间戳,     返回类型为TIMESTAMP_LTZ(3)。    在流模式下对每个记录进行评估。    但是在批处理模式下,在查询开始时计算一次,     并对每一行使用相同的结果。
now()select now();返回本地时区的当前SQL时间戳,这是CURRENT_TIMESTAMP的同义词。
current_row_timestamp()select current_row_timestamp();返回本地时区的当前sql时间戳,这是current_timestamp的同义词。    返回本地时区的当前sql时间戳,返回类型为timestamp_ltz(3)。    无论是批处理还是流模式,都会对每个记录进行评估。
extract(timeinteravlunit from  temporal)select extract(day from date '2006-06-05');返回从temporal的timeintervalunit部分提取的长值。    例如,extract(day from date '2006-06-05')返回5。
quarter(date)select quarter(date '1994-09-27');从sql  date日期返回每季度(1到4之间的整数)。    相当于extract(quarter from date)。    例如,quarter(date '1994-09-27')
month(date)select month(date '1994-09-27');从sql  date返回一年中的月份(1到12之间的整数)。    相当于extract(month from date)。    例如,month(date '1994-09-27')返回9。
week(date)select week(date '1994-09-27');从sql  date返回一年中的月份(1到12之间的整数)。    从sql date返回一年中的星期(1到53之间的整数)。    相当于extract(从日期开始的一周)。    例如:week(date '1994-09-27')返回39。    相当于extract(month from date)。    例如,month(date '1994-09-27')返回9。
dayofyear(date)select dayofyear(date '1994-09-27');从sql日期返回一年中的日期(1到366之间的整数)。    相当于extract(doy from date)。    例如,dayofyear(日期'1994-09-27')返回270。
dayofmonthselect extract(day from date  '1994-09-27');     select dayofweek(date '1994-09-27');从sql  date返回一个月中的哪一天(1到31之间的整数)。    相当于extract(day from date)。    例如,dayofweek(日期'1994-09-27')返回3。
hour(timestamp)select hour(timestamp '1994-09-27 13:14:15');从sql时间戳中返回一天中的小时(0到23之间的整数)。    相当于extract(hour from timestamp)。    例如,minute(timestamp '1994-09-27 13:14:15')返回14。
minute(timestamp)select minute(timestamp '1994-09-27 13:14:15');从sql时间戳中返回每小时的分钟(0到59之间的整数)。    相当于extract(minute from timestamp)。    例如,minute(timestamp '1994-09-27 13:14:15')返回14。
second(timestamp)select second(timestamp '1994-09-27 13:14:15');从sql时间戳返回一分钟中的秒(0到59之间的整数)。    相当于extract(second from timestamp)。    例如,second(timestamp '1994-09-27 13:14:15')返回15。
floor(timepoint to  timeintervalunit)select floor(time '12:44:31' to minute);返回一个值,该值将时间点舍入到时间单元timeintervalunit     例如,floor(time '12:44:31' to minute) 返回12:44:00。
ceil(timepoint to  timeintervaluntit)select ceil(time '12:44:31' to minute);返回一个值,该值将时间点舍入到时间单元timeintervalunit     例如,ceil(time '12:44:31' to minute)将于12:45:00。
(timepoint1, temporal1)  overlaps (timepoint2, temporal2)select (time '2:55:00',interval '1' hour) overlaps (time  '3:30:00',interval '2' hour);如果(timepoint1,  temporal1)和(timepoint2, temporal2)     定义的两个时间间隔重叠,则返回true。    时间值可以是时间点,也可以是时间间隔。    例如,(time '2:55:00', interval '1' hour) overlaps (time '3:30:00', interval  '2' hour)     返回true;overlaps (time '10:15:00', interval '3' hour)返回false。
date_format(timestamp, string)select date_format(timestamp '2021-10-06  12:12:12.111','yyyy-MM-dd');将时间戳转换为由日期格式字符串指定的格式的字符串值。    格式字符串与Java的SimpleDateFormat兼容。
timestampadd(timeintervalunit,  interval, timepoint)select timestampadd(hour,3,timestamp '2021-10-06  12:12:12.111');时间的加法根据单位
timestampdiff(timepointunit,  timepoint1, timepoint2)select timestampdiff(hour,timestamp '2021-10-06  12:12:12.111',timestamp '2021-10-06 15:12:12.111');返回timepoint1和timepoint2之间的(带符号的)timepointunit数目。    interval的单位由第一个参数给出,它应该是以下值之一:     second、minute、hour、day、month或year。
convert_tz(string1, string2,  string3)select convert_tz('1970-01-01  00:00:00','UTC','America/Los_Angeles');将datetime  string1(默认的ISO时间戳格式为'yyyy-MM-dd HH:mm:ss')     从时区string2转换为时区string3。    时区的格式应该是缩写,     如“PST”,全名,     如“America/Los_Angeles”,或者自定义ID,     如“GMT-08:00”。    例如,convert_tz('1970-01-01 00:00:00','UTC','America/Los_Angeles')返回'1969-12-31  16:00:00'。
from_unixtime(numeric[,  string])select from_unixtime(1505404800)以字符串格式返回数字参数的表示形式(默认为'yyyy-MM-dd  HH:mm:ss')。    numeric是一个内部时间戳值,表示自'1970-01-01 00:00:00' UTC以来的秒数,     例如UNIX_TIMESTAMP()函数生成的时间戳值。返回值在会话时区中表示     (在tablecconfig中指定)。    例如,FROM_UNIXTIME(44)在UTC时区返回'1970-01-01 00:00:44',     但在'Asia/Tokyo'时区返回'1970-01-01 09:00:44'。
unix_timestamp()select unix_timestamp();获取当前Unix时间戳(以秒为单位)。这个函数是不确定的,这意味着将为每个记录重新计算值。
unix_timestamp(string1[,  string2])SELECT  UNIX_TIMESTAMP('2021-10-06','yyyy-MM-dd');     SELECT date_format('2021-10-06 20:20:20','yyyy-MM-dd');将日期时间字符串string1的格式为string2     (默认为yyyy-MM-dd HH:mm:ss,如果没有指定,则HH:mm:ss)     转换为Unix时间戳(秒),使用表config中的指定时区。
to_date(string1[, string2])select to_date('2021-10-06','yyyy-MM-dd');将日期字符串string1的格式string2(默认为'yyyy-MM-dd')转换为日期。
to_timestamp_ltz(numeric,  precision)select to_timestamp_ltz(1000,3);将epoch秒或epoch毫秒转换为TIMESTAMP_LTZ,     有效精度为0或3,0表示TO_TIMESTAMP_LTZ(epochSeconds, 0),      3表示TO_TIMESTAMP_LTZ(epochMilliseconds, 3)。
to_timestamp(string1[,  string2])SELECT TO_TIMESTAMP('2021-10-06 12:12:12','yyyy-MM-dd  HH:mm:ss')将会话时区(由tablecconfig指定)下的日期时间字符串string1的格式string2     (默认:'yyyy-MM-dd HH:mm:ss')转换为时间戳。

3.5 条件函数

https://helpcdn.aliyun.com/document_detail/62715.html

3.5.1 is_decimal

语法

BOOLEAN IS_DECIMAL(VARCHAR str)
参数数据类型
strVARCHAR

str字符串如果可以转换为十进制数值返回TRUE;否则返回FALSE。

select 
is_decimal('1') as boo1,
is_decimal('123') as boo2,
is_decimal('2') as boo3,
is_decimal('11.4445') as boo4,
is_decimal('3') as boo5,
is_decimal('asd') as boo6,
is_decimal('') as boo7

3.5.2 is_digit

BOOLEAN IS_DIGIT(VARCHAR str)
参数数据类型
strVARCHAR

str中只包含数字则返回true,否则返回false。返回值为BOOLEAN类型。

SELECT 
IS_DIGIT(1) as boo1,
IS_DIGIT('abc') as boo2,
IS_DIGIT('') as boo3;

3.5.3 is_alpha

BOOLEAN IS_ALPHA(VARCHAR str) 
参数数据类型
strVARCHAR

如果str中只包含字母,则返回true,否则返回false。

SELECT 
is_alpha(1) as boo1,
is_alpha('abc') as boo2,
is_alpha('') as boo3;

3.5.4 if

T if (BOOLEAN testCondition, T valueTrue, T valueFalseOrNull)
参数数据类型
testConditionBOOLEAN
valueTrue可以为任意类型,但valueTrue和valueFalseOrNull类型需保持一致。
valueFalseOrNull可以为任意类型,但valueTrue和valueFalseOrNull类型需保持一致。
select if(1<2,'正常情况','错误答案');

3.5.5 case when

CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END

如果表达式a为TRUE,则返回b;如果表达式a为FALSE、c为TRUE,则返回d;如果表达式a和c都为FALSE,则返回e。

CASE WHEN返回常量字符串时,会在字符串后面补全空格。例如,当满足else条件时,返回值ios
后面会多几个空格。

解决方法:

  • 利用TRIM函数去除空格,该示例中,涉及os
    的字段都改为TRIM(os)

  • 利用CAST函数去除空格,将常量字符串转为VARCHAR类型。

select 
case when '123'='123' then '确实相等' else '不相等' end;

3.5.6 nullif

NULLIF(A,B)

两个参数的值相同,则返回null,值不同则返回第一个参数的值。

select nullif('123','123'); -- null
select nullif('123',''); -- 123
select nullif('','123'); -- 空

3.5.7 coalesce

COALESCE(A,B,...)
参数数据类型
A任意数据类型
B任意数据类型

返回列表中第一个非null的值,返回值类型和参数类型相同。如果列表中所有的值都是null,则返回null。

select coalesce(null,'123');

3.6 类型转换函数

3.6.1 cast

返回一个被强制转换为类型类型的新值。例如,CAST('42' AS INT) 返回 42;CAST(NULL AS VARCHAR) 返回 VARCHAR 类型的 NULL。

select cast('123' as int);

3.6.2 typeof

返回输入表达式的数据类型的字符串表示形式。默认情况下,返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。如果 force_serializable 设置为 TRUE,则该字符串表示可以保留在目录中的完整数据类型。请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下,返回 NULL。

select typeof(cast('123' as int),true);

3.7 集合函数

3.7.1 cardinality(array)

返回数组中元素的个数。

select cardinality(array['hello','aa']);

3.7.2 array [INT ]

select array['hello','aa'][1];

3.7.3 element(array)

select element(array['hello']);

这个之允许数组中只有一个元素,感觉这个没有什么适用的价值

3.7.4 cardinality(map)

select cardinality( map['k1','v1','k2','v2'])

3.7.5 map [value ]

select map['k1','v1','k2','v2'];

3.8 聚合函数

时间函数描述
count()     count(columName)     count(distinct columName)     count(1)     count(*)count() 函数返回匹配指定条件的行数。    COUNT(column_name) 函数返回指定列的值的数目(NULL 不计入)     COUNT() 函数返回表中的记录数     COUNT(DISTINCT column_name) 函数返回指定列的不同值的数目     count(1) 最好是使用count(1) 因为大多数情况count(1)都比count()好
rank()返回值在一组值中的排名。    结果是 1 加上分区顺序中当前行之前或等于当前行的行数。    这些值将在序列中产生间隙。
dense_rank()返回值在一组值中的排名。结果是一加先前分配的等级值。    与函数 rank 不同,dense_rank 不会在排名序列中产生间隙。
row_number()根据窗口分区内行的顺序,为每一行分配一个唯一的序列号,从一开始。    ROW_NUMBER 和 RANK 相似。ROW_NUMBER 按顺序对所有行进行编号     (例如 1、2、3、4、5)。    RANK 为平局提供相同的数值(例如 1、2、2、4、5)。
lead返回窗口中当前行之后第 offsetth  行处的表达式值。    offset 的默认值为 1,default 的默认值为 NULL。
lag返回窗口中当前行之前第 offsetth  行处的表达式值。    offset 的默认值为 1,default 的默认值为 NULL。
first_value返回一组有序值中的第一个值。
last_value返回一组有序值中的最后一个值。
listagg连接字符串表达式的值并在它们之间放置分隔符值。    字符串末尾不添加分隔符。分隔符的默认值为“,”。

4、udf

    package com.wmy.flink.sql.flinksql.fun.udf;


    import lombok.*;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.functions.ScalarFunction;


    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.call;




    /**
    * @project_name: flinkDemo
    * @package_name: com.wmy.flink.sql.flinksql.fun.udf
    * @Author: wmy
    * @Date: 2021/10/7
    * @Major: 数据科学与大数据技术
    * @Post:大数据实时开发
    * @Email:wmy_2000@163.com
    * @Desription: flink UDF 用户案例
    * @Version: wmy-version-01
    */
    public class Flink_SQL_Fun_UDF_01 {
    public static void main(String[] args) throws Exception {


    System.out.println("Flink_SQL_Fun_UDF_01 >>> 2021-10-07");


    // 创建流式环境和表环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    EnvironmentSettings environmentSettings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .useBlinkPlanner()
    .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);


    // 读取数据
    DataStreamSource<String> socketTextStream = env.socketTextStream("flink01", 8888);


    // 对数据进行转换
    SingleOutputStreamOperator<User> waterSensorDS = socketTextStream
    .map(new MapFunction<String, User>() {
    @Override
    public User map(String value) throws Exception {
    String[] fields = value.split(",");
    return User.builder()
    .id(fields[0])
    .name(fields[1])
    .age(Integer.parseInt(fields[2]))
    .build();
    }
    });


    //3.将流转换为动态表
    Table table = tableEnv.fromDataStream(waterSensorDS);


    // 不注册就进行使用
    //table.select($("id"), $("name"), call(UserNameLength.class, $("name")).as("nameLength"), $("age")).execute().print();


    // 注册在进行使用
    tableEnv.createTemporarySystemFunction("nameLength", UserNameLength.class);
    tableEnv.sqlQuery("select id,name,age,nameLength(name) as nameLength from " + table).execute().print();


    // 执行流式环境
    env.execute("Flink_SQL_Fun_UDF_01 >>> 2021-10-07");
    }


    // define function logic
    public static class UserNameLength extends ScalarFunction {
    public int eval(String value) {
    return value.length();
    }
    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder
    @ToString
    public static class User {
    private String id;
    private String name;
    private int age;
        }
    }



    欢迎大家关注我的微信公众号

    我会每天分享我的学习笔记和资料,大家也可以多多留言,共同进步。

    文章转载自骚明的大数据之旅,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论