暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

省医保又又又来活了:Flink UDF使用指南,你竟然不会用?

大数据从业者 2022-07-09
990

前言

    Flink TableAPI & SQL允许用户使用functions对数据进行各种transformations操作,这个也是计算引擎的基本能力。

     Flink functions可以分为system函数(即内置函数)、catalog函数;system函数没有命名空间,直接使用名称引用;catalog函数有catalog、database命名空间(作用域有限制:catalog.db.func或db.func)。

      Flink functions也可以分为temporary functions(临时函数)、persistent functions(永久函数);临时函数的生命周期仅限于当前session,由用户创建;而永久函数生命周期为所有session,由内置函数或catalog函数提供。

      温馨提示:微信公众号私信有时效性、过期不可回复,很多朋友私信我没及时看到、未能回复。最近新建钉钉群(44703541),欢迎朋友加入!

系统内置函数

     Flink提供丰富的内置函数:Scalar Functions、Aggregate Functions、Column Functions、Time Interval and Point Unit Specifiers等。详见

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/systemfunctions/#system-built-in-functions

如果特定业务场景使用内置函数无法满足开发需求的话,可以自行实现User-defined Functions(即我们常说的UDF函数)。

UDF函数

JVM(Java/Scala)、Python都支持用户自定义函数。

Flink UDF支持如下:

    Scalar functions:映射scalar值为新scalar值
    Table functions:映射scalar值为新rows
    Aggregate functions:映射多个rows的scalar值为新scalar值
    Table aggregate functions:映射多个rows的scalar值为新rows
    Async table functions:table source执行查询操作

    DataStream UDF实战案例

         通过实现MapFunction接口,实现一个我们测试用的DataStream UDF函数MyMapFunction1。

      public class MyMapFunction1 implements MapFunction{
      @Override
      public String map(String s) throws Exception {
      return s + " " + new Date();
      }
      }

             编译打包为UserDefinedFunctions-1.0.jar

            实际开发有两种使用该UDF的方法:直接new或者反射。当然推荐反射方式,这样UDF迭代更新时,业务代码并不需要修改、仅需重启即可。  

        //方法1:通过反射newInstance实例化对象。优点:解耦、可配置类名称
        String mapFunctionClassName = null;
        if (args.length == 1 && args[0] != null) {
        mapFunctionClassName = args[0];
        } else {
        mapFunctionClassName = "MyMapFunction1";
        }
        Class myMapFunction = Class.forName(mapFunctionClassName);
        Object object = myMapFunction.newInstance();
                socketData.map((MapFunction Object>) object).print()


        //方法2:通过new实例化对象。缺点:耦合固定类名称
        //socketData.map(new MyMapFunction()).print();
        //socketData.map(new MyMapFunction1()).print();

             完整案例见:

        https://github.com/felixzh2020/felixzh-learning-flink/blob/master/FlinkIdeaDemo/src/main/java/NCFlinkUDFIdea.java

           启动命令:

          [root@felixzh home]# nc -l 4444
          [root@felixzh home]# java -cp FlinkIdeaDemo-1.0.jar:UserDefinedFunctions-1.0.jar NCFlinkIdea MyMapFunction1

          从上图可以看出UDF结果为输入值+当前时间,符合预期。

          SQL UDF实战案例

          通过继承ScalarFunction类,实现一个SQL UDF函数JsonFunction。

              /**
            * 标量函数:必要有eval方法。
            * 方法名必须为eval,输入参数类型和个数自定义、返回值类型自定义
            */
            public static class JsonFunction extends ScalarFunction {
            public String eval(String line) {
            return line + ":" + line;
            }
            }

            StreamTableEnvironment实例执行SQL语句之前,需要先注册UDF函数:

              //注册的Function名称自定义任意值
              tEnv.createTemporarySystemFunction("JsonFunction1", JsonFunction.class);

              然后,再跟之前一样执行SQL语句:

                // execute a Flink SQL job and print the result locally
                tEnv.executeSql(
                / define the aggregation
                "SELECT JsonFunction1(word), frequency\n"
                // read from an artificial fixed-size table with rows and columns
                + "FROM (\n"
                + " VALUES ('Hello', 1), ('Ciao', 1), ('felixzh', 2)\n"
                + ")\n"
                // name the table and its columns
                + "AS WordTable(word, frequency)\n")
                .print();

                完整案例见:

                https://github.com/felixzh2020/felixzh-learning-flink/blob/master/FlinkIdeaDemo/src/main/java/NCFlinkSqlUDFIdea.java

                从上图可知UDF处理结果为字段值+字段值,符合预期。


                文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论