暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL Json格式实战

大数据从业者 2020-12-21
4795

概述

Flink sql支持基于JSON格式从kafka消费数据并写入到kafka。目前,JSON schema是依赖表schema派生的,还不支持显示的设置Json schema。Flink Json格式使用jackson解析和生成Json字符串。如果打算使用Json格式进行数据序列化和反序列化,均需要flink-json依赖

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
    </dependency>

    Json格式参数说明

    详解:

    format:必选参数,默认为none,这里肯定要设置为json

    json.fail-on-missing-field:可选参数,默认false。当数据中缺少字段时,是否失败。如表table(a int, b string),数据是{"a":1}。false表示不失败,缺失字段设为null;true则表示失败,触发job重启或者退出。

    json.ignore-parse-errors可选参数,默认false。当字段类型和数据格式解析失败,是否通过设置为null的方式规避错误数据。注意:这里是有个不太理想的状况,当数据是使用事件时间进行处理时候,该种规避方式会将用于提取事件时间的字段设置为null,这在flink sql内核中是不允许的,规避无效,job会退出或者重启(关于这一点后续会专门发文)。
    json.timestamp-format.standard:可选参数,默认SQL,指定输入输出所使用的timestamp格式。SQL对应于yyyy-MM-dd HH:mm:ss.s{precision}如2020-12-30 12:13:14.123。也可以设置为ISO-8601,对应于yyyy-MM-ddTHH:mm:ss.s{precision}2020-12-30T12:13:14.123

    数据类型对应关系

    实战案例代码

      public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);
      env.setRestartStrategy(RestartStrategies.noRestart());
      logger.info(env.getConfig().toString());
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


      String sourceDDL = "CREATE TABLE json_source (" +
      "user_id INT, " +
      "product STRING," +
      "ts timestamp(3)," +
      "watermark for ts as ts - interval '5' second" +
      ") WITH (\n" +
      " 'connector' = 'kafka',\n" +
      " 'scan.startup.mode' = 'latest-offset',\n" +
      " 'topic' = 'json_source',\n" +
      " 'properties.bootstrap.servers' = 'felixzh:9092',\n" +
      " 'properties.group.id' = 'testGroup',\n" +
      " 'format' = 'json',\n" +
      " 'json.fail-on-missing-field' = 'false',\n" +
      " 'json.ignore-parse-errors' = 'true'\n" +
      ")";


      String sinkDDL = "CREATE TABLE sink (user_id INT,product STRING) WITH (\n" +
      " 'connector' = 'kafka',\n" +
      " 'topic' = 'sink',\n" +
      " 'properties.bootstrap.servers' = 'felixzh:9092',\n" +
      " 'format' = 'json'\n" +
      ")";


      String transformSQL = "insert into sink(user_id,product) SELECT user_id,product FROM json_source ";


      tableEnv.executeSql(sourceDDL);
      tableEnv.executeSql(sinkDDL);
      tableEnv.executeSql(transformSQL);
      }

      完整源码github地址:

      https://github.com/felixzh2020/felixzh-learning-flink/tree/master/format/src/main/java/com/felixzh/flink/format/json

      实战案例结果


      文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论