暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink cdc到doris,starrocks,table store

大数据启示录 2022-12-10
398

 


flink cdc到doris


 flink cdc同步到doris主类:

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.cfg.DorisReadOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;


    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.UUID;


    /***
    *
    * Synchronize the full database through flink cdc
    *
    */
    public class DatabaseFullSync {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);
    private static String HOST = "127.0.0.1";
    private static String MYSQL_PASSWD = "password";
    private static int MYSQL_PORT = 3306;
    private static int DORIS_PORT = 8030;
    private static String MYSQL_USER = "root";




    private static String SYNC_DB = "test";
    private static String SYNC_TBLS = "test.*";
    private static String TARGET_DORIS_DB = "test";


    public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname(HOST)
    .port(MYSQL_PORT)
    .databaseList(SYNC_DB) set captured database
    .tableList(SYNC_TBLS) set captured table
    .username(MYSQL_USER)
    .password(MYSQL_PASSWD)
    .deserializer(new JsonDebeziumDeserializationSchema()) converts SourceRecord to JSON String
    .build();


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    enable checkpoint
    env.enableCheckpointing(10000);


    DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");


    get table list
    List<String> tableList = getTableList();
    LOG.info("sync table list:{}",tableList);
    for(String tbl : tableList){
    SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
    SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
    DorisSink dorisSink = buildDorisSink(tbl);
    cleanStream.sinkTo(dorisSink).name("sink " + tbl);
    }
    env.execute("Full Database Sync ");
    }


    **
    * Get real data
    * {
    * "before":null,
    * "after":{
    * "id":1,
    * "name":"zhangsan-1",
    * "age":18
    * },
    * "source":{
    * "db":"test",
    * "table":"test_1",
    * ...
    * },
    * "op":"c",
    * ...
    * }
    * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
    return source.flatMap(new FlatMapFunction<String,String>(){
    @Override
    public void flatMap(String row, Collector<String> out) throws Exception {
    try{
    JSONObject rowJson = JSON.parseObject(row);
    String op = rowJson.getString("op");
    history,insert,update
    if(Arrays.asList("r","c","u").contains(op)){
    out.collect(rowJson.getJSONObject("after").toJSONString());
    }else{
    LOG.info("filter other op:{}",op);
    }
    }catch (Exception ex){
    LOG.warn("filter other format binlog:{}",row);
    }
    }
    });
    }


    **
    * Divide according to tablename
    * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
    return source.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String row) throws Exception {
    try {
    JSONObject rowJson = JSON.parseObject(row);
    JSONObject source = rowJson.getJSONObject("source");
    String tbl = source.getString("table");
    return table.equals(tbl);
    }catch (Exception ex){
    ex.printStackTrace();
    return false;
    }
    }
    });
    }


    **
    * Get all MySQL tables that need to be synchronized
    * */
    private static List<String> getTableList() {
    List<String> tables = new ArrayList<>();
    String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
    List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
    for(JSONObject jsob : tableList){
    String schemaName = jsob.getString("TABLE_SCHEMA");
    String tblName = jsob.getString("TABLE_NAME");
    String schemaTbl = schemaName + "." + tblName;
    if(schemaTbl.matches(SYNC_TBLS)){
    tables.add(tblName);
    }
    }
    return tables;
    }


    **
    * create doris sink
    * */
    public static DorisSink buildDorisSink(String table){
    DorisSink.Builder<String> builder = DorisSink.builder();
    DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
    .setTableIdentifier(TARGET_DORIS_DB + "." + table)
    .setUsername("root")
    .setPassword("");


    Properties pro = new Properties();
    json data format
    pro.setProperty("format", "json");
    pro.setProperty("read_json_by_line", "true");
    DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
    .setLabelPrefix("label-" + table + UUID.randomUUID()) streamload label prefix,
    .setStreamLoadProp(pro).build();


    builder.setDorisReadOptions(DorisReadOptions.builder().build())
    .setDorisExecutionOptions(executionOptions)
    .setSerializer(new SimpleStringSerializer()) serialize according to string
    .setDorisOptions(dorisBuilder.build());


    return builder.build();
    }
    }

    JdbcUtil类:

      import com.alibaba.fastjson.JSONObject;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;


      import java.sql.Connection;
      import java.sql.DriverManager;
      import java.sql.PreparedStatement;
      import java.sql.ResultSet;
      import java.sql.ResultSetMetaData;
      import java.sql.SQLException;
      import java.util.ArrayList;
      import java.util.List;


      public class JdbcUtil {


      static {
      try {
      Class.forName("com.mysql.jdbc.Driver");
      } catch (ClassNotFoundException e) {
      e.printStackTrace();
      }
      }


      private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);


      public static void main(String[] args) throws SQLException {
      }


      public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
      List<JSONObject> beJson = new ArrayList<>();
      String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);
      Connection con = null;
      try {
      con = DriverManager.getConnection(connectionUrl,user,password);
      PreparedStatement ps = con.prepareStatement(sql);
      ResultSet rs = ps.executeQuery();
      beJson = resultSetToJson(rs);
      } catch (SQLException e) {
      e.printStackTrace();
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      try {
      con.close();
      } catch (Exception e) {
      }
      }
      return beJson;
      }


      private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
      List<JSONObject> list = new ArrayList<>();
      ResultSetMetaData metaData = rs.getMetaData();
      int columnCount = metaData.getColumnCount();
      while (rs.next()) {
      JSONObject jsonObj = new JSONObject();
      for (int i = 1; i <= columnCount; i++) {
      String columnName =metaData.getColumnLabel(i);
      String value = rs.getString(columnName);
      jsonObj.put(columnName, value);
      }
      list.add(jsonObj);
      }
      return list;
      }
      }

      flink cdc到starrocks


      主要实现的流程:

      1. Flink cdc 采集 mysql 数据

      2. 将 cdc 采集到的数据转为 json

      3. 从 json 中获取 数据库、表和数据

      4. 用数据库和表对数据做 key by

      5. 使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据

      6. sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据



      主类 CdcToStarRocks

              主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks

        import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;
        import com.ververica.cdc.connectors.mysql.source.MySqlSource;
        import com.ververica.cdc.connectors.mysql.table.StartupOptions;
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


        import java.util.Properties;


        /**
        * mysql cdc demo
        * <p>
        * cdc 整库同步数据到 starrocks
        * <p>
        * 局限:
        * 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更
        * 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
        * 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
        */
        public class CdcToStarRocks {


        // 每个批次最大条数和等待时间
        private static int batchSize = 10000;
        private static long batchInterval = 10 *1000;


        public static void main(String[] args) throws Exception {


        String ip = "localhost";
        int port = 3306;
        String db = "venn";
        // String table = "venn.user_log";
        String table = "venn.*";
        String user = "root";
        String pass = "123456";


        String starrocksIp = "10.201.0.230";
        String starrocksPort = "29030";
        String starrocksLoadPort = "28030";
        String starrocksUser = "root";
        String starrocksPass = "123456";
        String starrocksDb = "test";


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
        .hostname(ip)
        .port(port)
        // 获取两个数据库的所有表
        .databaseList(db)
        .tableList(table)
        .username(user)
        .password(pass)
        .startupOptions(StartupOptions.latest())
        // do not cache schema change
        // .includeSchemaChanges(false)
        // .startupOptions(StartupOptions.initial())
        // 自定义 解析器,讲数据解析成 json
        .deserializer(new CommonStringDebeziumDeserializationSchema(ip, port))
        .build();


        env
        .fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc")
        .name("source")
        .uid("source")
        // json 字符串转 CdcRecord
        .map(new CdcStarMapFunction())
        .name("map")
        .keyBy( record -> record.getDb() + "_" + record.getTable())
        .process(new CdcStarProcessFunction(batchSize, batchInterval))
        .name("process")
        .uid("process")
        .addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb))
        .name("sink");


        env.execute("cdcToStarRocks");
        }
        }

        反序列化器 CommonStringDebeziumDeserializationSchema

        反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:

        1. insert 类型的操心,只需要获取 after 中的数据

        2. update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after

        3. delete 类型的操作,需要获取 before

        4. 如果有 ddl 操作,需要特殊处理(本次不包含)

          import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
          import com.google.gson.JsonObject;
          import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          import org.apache.flink.api.common.typeinfo.TypeInformation;
          import org.apache.flink.util.Collector;
          import org.apache.kafka.connect.data.Field;
          import org.apache.kafka.connect.data.Struct;
          import org.apache.kafka.connect.source.SourceRecord;


          /**
          * deserialize debezium format binlog
          */
          public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {


          private String host;
          private int port;




          public CommonStringDebeziumDeserializationSchema(String host, int port) {
          this.host = host;
          this.port = port;
          }


          public void deserialize(SourceRecord record, Collector<String> out) {
          JsonObject jsonObject = new JsonObject();


          String binlog = record.sourceOffset().get("file").toString();
          String offset = record.sourceOffset().get("pos").toString();
          String ts_sec = record.sourceOffset().get("ts_sec").toString();


          // System.out.println("binlog : " + binlog + ", offset = " + offset);
          todo get schame change


          jsonObject.addProperty("host", host);
          add meta
          jsonObject.addProperty("binlog", binlog);
          jsonObject.addProperty("offset", offset);
          jsonObject.addProperty("ts_sec", ts_sec);
          jsonObject.addProperty("port", port);
          jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
          jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
          jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
          String[] name = record.valueSchema().name().split("\\.");
          jsonObject.addProperty("db", name[1]);
          jsonObject.addProperty("table", name[2]);
          Struct value = ((Struct) record.value());
          String operatorType = value.getString("op");
          jsonObject.addProperty("operator_type", operatorType);
          c : create, u: update, d: delete, r: read
          insert update
          if (!"d".equals(operatorType)) {
          Struct after = value.getStruct("after");
          JsonObject afterJsonObject = parseRecord(after);
          jsonObject.add("after", afterJsonObject);
          }
          update & delete
          if ("u".equals(operatorType) || "d".equals(operatorType)) {
          Struct source = value.getStruct("before");
          JsonObject beforeJsonObject = parseRecord(source);
          jsonObject.add("before", beforeJsonObject);
          }
          jsonObject.addProperty("parse_time", System.currentTimeMillis() 1000);


          out.collect(jsonObject.toString());
          }


          private JsonObject parseRecord(Struct after) {
          JsonObject jo = new JsonObject();
          for (Field field : after.schema().fields()) {
          switch ((field.schema()).type()) {
          case INT8:
          int resultInt8 = after.getInt8(field.name());
          jo.addProperty(field.name(), resultInt8);
          break;
          case INT64:
          Long resultInt = after.getInt64(field.name());
          jo.addProperty(field.name(), resultInt);
          break;
          case FLOAT32:
          Float resultFloat32 = after.getFloat32(field.name());
          jo.addProperty(field.name(), resultFloat32);
          break;
          case FLOAT64:
          Double resultFloat64 = after.getFloat64(field.name());
          jo.addProperty(field.name(), resultFloat64);
          break;
          case BYTES:
          json ignore byte column
          byte[] resultByte = after.getBytes(field.name());
          jo.addProperty(field.name(), String.valueOf(resultByte));
          break;
          case STRING:
          String resultStr = after.getString(field.name());
          jo.addProperty(field.name(), resultStr);
          break;
          default:
          }
          }


          return jo;
          }


          public TypeInformation<String> getProducedType() {
          return BasicTypeInfo.STRING_TYPE_INFO;
          }
          }

          转换函数 CdcStarMapFunction

          CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。

          获取数据时,insert 和 update 只获取 after 的值

            import java.util.LinkedHashMap;
            import java.util.Map;
            import lombok.Data;
            /**
            * cdcRecord save
            */
            @Data
            public class CdcRecord {
            private String db;
            private String table;
            private String op;
                private Map<String, String> data = new LinkedHashMap<>();
            }


              import com.google.gson.JsonElement;
              import com.google.gson.JsonObject;
              import com.google.gson.JsonParser;
              import org.apache.flink.api.common.functions.RichMapFunction;
              import org.apache.flink.configuration.Configuration;
              import org.slf4j.Logger;
              import org.slf4j.LoggerFactory;


              import java.util.Map;


              public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {


              private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);
              private JsonParser parser;


              @Override
              public void open(Configuration parameters) throws Exception {
              parser = new JsonParser();
              }


              @Override
              public CdcRecord map(String element) throws Exception {


              LOG.debug("data : {}" , element );
              JsonObject object = parser.parse(element).getAsJsonObject();
              String db = object.get("db").getAsString();
              String table = object.get("table").getAsString();
              String op = object.get("operator_type").getAsString();


              CdcRecord record = new CdcRecord(db, table, op);


              insert/update
              String dataLocation = "after";
              if("d".equals(op)){
              if op is delete, get before
              dataLocation = "before";
              }


              JsonObject data = object.get(dataLocation).getAsJsonObject();


              for(Map.Entry<String, JsonElement> entry: data.entrySet()){


              String columnName = entry.getKey();
              String columnValue;
              JsonElement value = entry.getValue();
              if(!value.isJsonNull()){
              if column value is not null, get as string
              columnValue = value.getAsString();
              put column name/value to record.data
              record.getData().put(columnName, columnValue);
              }


              }


              return record;
              }
              }


              处理函数 CdcStarProcessFunction

              CdcStarProcessFunction 有三部分逻辑:

              1. 三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据

              2. process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态

              3. timer 触发 flushData 方法往下游输出数据,清理状态

                import org.apache.flink.api.common.state.ListState;
                import org.apache.flink.api.common.state.ListStateDescriptor;
                import org.apache.flink.api.common.state.ValueState;
                import org.apache.flink.api.common.state.ValueStateDescriptor;
                import org.apache.flink.api.common.typeinfo.TypeInformation;
                import org.apache.flink.configuration.Configuration;
                import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
                import org.apache.flink.util.Collector;
                import org.slf4j.Logger;
                import org.slf4j.LoggerFactory;


                import java.util.ArrayList;
                import java.util.Iterator;
                import java.util.List;


                public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {


                private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);
                private int batchSize;
                private long batchInterval;
                next timer time
                private ValueState<Long> cacheTimer;
                current cache size
                private ValueState<Integer> cacheSize;
                / cache data
                private ListState<CdcRecord> cache;


                public CdcStarProcessFunction(int batchSize, long batchInterval) {
                this.batchSize = batchSize;
                this.batchInterval = batchInterval;
                }


                @Override
                public void open(Configuration parameters) throws Exception {


                ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));
                cache = getRuntimeContext().getListState(cacheDescriptor);


                ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);
                cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);


                ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);
                cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);
                }


                @Override
                public void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {


                // cache size + 1
                if (cacheSize.value() != null) {
                cacheSize.update(cacheSize.value() + 1);
                } else {
                cacheSize.update(1);
                // add timer for interval flush
                long nextTimer = System.currentTimeMillis() + batchInterval;
                LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());
                cacheTimer.update(nextTimer);
                ctx.timerService().registerProcessingTimeTimer(nextTimer);
                }
                // add data to cache state
                cache.add(element);
                // cache size max than batch Size
                if (cacheSize.value() >= batchSize) {
                // remove next timer
                long nextTimer = cacheTimer.value();
                LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());
                ctx.timerService().deleteProcessingTimeTimer(nextTimer);
                // flush data to down stream
                flushData(out);
                }
                }


                /**
                * flush data to down stream
                */
                private void flushData(Collector<List<CdcRecord>> out) throws Exception {
                List<CdcRecord> tmpCache = new ArrayList<>();
                Iterator<CdcRecord> it = cache.get().iterator();
                while (it.hasNext()) {
                tmpCache.add(it.next());
                }
                if (tmpCache.size() > 0) {
                out.collect(tmpCache);


                // finish flush all cache data, clear state
                cache.clear();
                cacheSize.clear();
                cacheTimer.clear();
                }
                }


                @Override
                public void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {
                LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);
                // batch interval trigger flush data
                flushData(out);
                }


                @Override
                public void close() throws Exception {
                }
                }

                输出函数 StarRocksSink

                StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。

                • StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作

                • 见参考文档1

                invoke 方法

                StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:

                1. 从数据中获取数据库和表,拼接成 key

                2. 获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取

                3. 组装数据

                4. 拼接 load url

                5. 用 http 方式往 StarRocks 写数据

                loadTargetTableSchema 方法

                执行 desc db.table
                 获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据

                parseUploadData 方法

                用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0

                doHttp 方法

                用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略

                  import org.apache.commons.codec.binary.Base64;
                  import org.apache.flink.configuration.Configuration;
                  import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
                  import org.apache.http.HttpException;
                  import org.apache.http.HttpHeaders;
                  import org.apache.http.HttpRequest;
                  import org.apache.http.HttpRequestInterceptor;
                  import org.apache.http.client.methods.CloseableHttpResponse;
                  import org.apache.http.client.methods.HttpPut;
                  import org.apache.http.entity.StringEntity;
                  import org.apache.http.impl.client.CloseableHttpClient;
                  import org.apache.http.impl.client.DefaultRedirectStrategy;
                  import org.apache.http.impl.client.HttpClientBuilder;
                  import org.apache.http.impl.client.HttpClients;
                  import org.apache.http.protocol.HTTP;
                  import org.apache.http.protocol.HttpContext;
                  import org.apache.http.util.EntityUtils;
                  import org.slf4j.Logger;
                  import org.slf4j.LoggerFactory;


                  import java.io.IOException;
                  import java.nio.charset.StandardCharsets;
                  import java.sql.*;
                  import java.util.ArrayList;
                  import java.util.HashMap;
                  import java.util.List;
                  import java.util.Map;


                  public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {


                  private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);
                  public final static String COL_SEP = "\\\\x01";
                  public final static String ROW_SEP = "\\\\x02";
                  public final static String NULL_COL = "\\N";
                  private String ip;
                  private String port;
                  private String loadPort;
                  private String user;
                  private String pass;
                  private String db;
                  private Connection connection;
                  private Map<String, String> spliceColumnMap = new HashMap<>();
                  private Map<String, List<String>> columnMap = new HashMap<>();


                  public StarRocksSink() {
                  }


                  public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {
                  this.ip = ip;
                  this.port = port;
                  this.loadPort = loadPort;
                  this.user = user;
                  this.pass = pass;
                  this.db = db;


                  }


                  @Override
                  public void open(Configuration parameters) throws Exception {
                  reConnect();
                  }


                  @Override
                  public void invoke(List<CdcRecord> element, Context context) throws Exception {


                  LOG.info("write batch size: " + element.size());
                  if(element == null || element.size() ==0){
                  LOG.info("ignore empty element");
                  return;
                  }


                  // use StarRocks db name
                  // String db = cache.get(0).getDb();
                  String table = element.get(0).getTable();
                  String key = db + "_" + table;


                  // get table schema
                  List<String> columnList;
                  if (!columnMap.containsKey(key)) {
                  // db.table is first coming, load column, put to spliceColumnMap & columnMap
                  loadTargetTableSchema(key, db, table);
                  }
                  String columns = spliceColumnMap.get(key);
                  columnList = columnMap.get(key);
                  if (columnList.size() == 0) {
                  LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);
                  }
                  // make up data
                  String data = parseUploadData(element, columnList);


                  final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);
                  String label = db + "_" + table + "_" + System.currentTimeMillis();


                  // send data to starrocks
                  doHttp(loadUrl, data, label, columns);


                  }


                  /**
                  * http send data to starrocks
                  */
                  private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {


                  final HttpClientBuilder httpClientBuilder = HttpClients
                  .custom()
                  .setRedirectStrategy(new DefaultRedirectStrategy() {
                  @Override
                  protected boolean isRedirectable(String method) {
                  return true;
                  }
                  })
                  .addInterceptorFirst(new ContentLengthHeaderRemover());


                  try (CloseableHttpClient client = httpClientBuilder.build()) {
                  HttpPut put = new HttpPut(loadUrl);
                  StringEntity entity = new StringEntity(data, "UTF-8");
                  put.setHeader(HttpHeaders.EXPECT, "100-continue");
                  put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));
                  // the label header is optional, not necessary
                  // use label header can ensure at most once semantics
                  put.setHeader("label", label);
                  put.setHeader("columns", columns);
                  put.setHeader("row_delimiter", ROW_SEP);
                  put.setHeader("column_separator", COL_SEP);
                  put.setEntity(entity);


                  try (CloseableHttpResponse response = client.execute(put)) {
                  String loadResult = "";
                  if (response.getEntity() != null) {
                  loadResult = EntityUtils.toString(response.getEntity());
                  }
                  final int statusCode = response.getStatusLine().getStatusCode();
                  // statusCode 200 just indicates that starrocks be service is ok, not stream load
                  // you should see the output content to find whether stream load is success
                  if (statusCode != 200) {
                  throw new IOException(
                  String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                  }


                  }
                  }


                  }


                  private String basicAuthHeader(String username, String password) {
                  final String tobeEncode = username + ":" + password;
                  byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
                  return "Basic " + new String(encoded);
                  }


                  private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {
                  StringBuilder builder = new StringBuilder();
                  for (CdcRecord element : cache) {


                  Map<String, String> data = element.getData();


                  for (String column : columnList) {
                  if (data.containsKey(column)) {
                  builder.append(data.get(column)).append(COL_SEP);
                  } else {
                  // if target column not exists in source data, set as null
                  builder.append(NULL_COL).append(COL_SEP);
                  }
                  }
                  // add __op
                  if ("d".equals(element.getOp())) {
                  // delete
                  builder.append("1");
                  } else {
                  // upsert
                  builder.append("0");
                  }
                  // add row separator
                  builder.append(ROW_SEP);
                  }
                  // remove last row sep
                  builder = builder.delete(builder.length() - 5, builder.length());
                  String data = builder.toString();
                  return data;
                  }


                  /**
                  * load table schema, parse to http column and column list for load source data
                  */
                  private void loadTargetTableSchema(String key, String db, String table) throws SQLException {


                  List<String> columnList = new ArrayList<>();


                  StringBuilder builer = new StringBuilder();
                  try {
                  // load table schema
                  PreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);
                  ResultSet result = insertPS.executeQuery();
                  while (result.next()) {
                  String column = result.getString(1);
                  builer.append(column).append(",");
                  columnList.add(column);
                  }
                  } catch (SQLException e) {
                  LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());
                  }
                  builer.append("__op");


                  String columns = builer.toString();


                  spliceColumnMap.put(key, columns);
                  columnMap.put(key, columnList);
                  }


                  /**
                  * reconnect to starrocks
                  *
                  * @throws SQLException
                  */
                  private void reConnect() throws SQLException {
                  String driver = "jdbc:mysql://" + ip + ":" + port;
                  if (connection == null || connection.isClosed()) {
                  connection = DriverManager.getConnection(driver, user, pass);
                  }
                  }


                  @Override
                  public void finish() throws Exception {
                  LOG.info("finish");
                  }


                  @Override
                  public void close() throws Exception {
                  LOG.info("close...");
                  connection.close();
                  }


                  private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {
                  @Override
                  public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
                  // fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");
                  request.removeHeaders(HTTP.CONTENT_LEN);
                  }
                  }
                  }

                  局限性:

                  1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更

                  2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更

                  3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据


                  flink cdc到table store


                   

                    package name.lijiaqi;

                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.table.api.EnvironmentSettings;
                    import org.apache.flink.table.api.SqlDialect;
                    import org.apache.flink.table.api.TableResult;
                    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

                    public class MysqlToHudiExample {
                    public static void main(String[] args) throws Exception {
                    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

                    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

                    // 数据源表
                    String sourceDDL =
                    "CREATE TEMPORARY TABLE ods_lineitem (\n" +
                    " l_orderkey INT NOT NULL,\n" +
                    " l_partkey INT NOT NULL,\n" +
                    " l_suppkey INT NOT NULL,\n" +
                    " l_linenumber INT NOT NULL,\n" +
                    " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
                    " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
                    " l_discount DECIMAL(15, 2) NOT NULL,\n" +
                    " l_tax DECIMAL(15, 2) NOT NULL,\n" +
                    " l_returnflag CHAR(1) NOT NULL,\n" +
                    " l_linestatus CHAR(1) NOT NULL,\n" +
                    " l_shipdate DATE NOT NULL,\n" +
                    " l_commitdate DATE NOT NULL,\n" +
                    " l_receiptdate DATE NOT NULL,\n" +
                    " l_shipinstruct CHAR(25) NOT NULL,\n" +
                    " l_shipmode CHAR(10) NOT NULL,\n" +
                    " l_comment VARCHAR(44) NOT NULL,\n" +
                    " PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +
                    ") WITH (\n" +
                    " 'connector' = 'mysql-cdc',\n" +
                    " 'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
                    " 'port' = '3307',\n" +
                    " 'username' = 'flink',\n" +
                    " 'password' = 'flink',\n" +
                    " 'database-name' = 'tpch_s10',\n" +
                    " 'table-name' = 'lineitem' \n" +
                    ");"


                    String tableSourceDDL=
                    "CREATE CATALOG `table_store` WITH (\n" +
                    "'type' = 'table-store',\n" +
                    "'warehouse' = '/tmp/table-store-101'\n" +
                    ");\n" +
                    "USE CATALOG `table_store`;"



                    // 输出目标表
                    String sinkDWDDDL =
                    "CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +
                    " l_orderkey INT NOT NULL,\n" +
                    " l_partkey INT NOT NULL,\n" +
                    " l_suppkey INT NOT NULL,\n" +
                    " l_linenumber INT NOT NULL,\n" +
                    " l_quantity DECIMAL(15, 2) NOT NULL,\n" +
                    " l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +
                    " l_discount DECIMAL(15, 2) NOT NULL,\n" +
                    " l_tax DECIMAL(15, 2) NOT NULL,\n" +
                    " l_returnflag CHAR(1) NOT NULL,\n" +
                    " l_linestatus CHAR(1) NOT NULL,\n" +
                    " l_shipdate DATE NOT NULL,\n" +
                    " l_commitdate DATE NOT NULL,\n" +
                    " l_receiptdate DATE NOT NULL,\n" +
                    " l_shipinstruct CHAR(25) NOT NULL,\n" +
                    " l_shipmode CHAR(10) NOT NULL,\n" +
                    " l_comment VARCHAR(44) NOT NULL,\n" +
                    " l_year BIGINT NOT NULL,\n" +
                    " l_month BIGINT NOT NULL,\n" +
                    " PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +
                    " ) PARTITIONED BY (l_year, l_month) WITH (
                    " -- 每个 partition 下设置 2 个 bucket
                    " 'bucket' = '2',\n" +
                    " -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
                    " 'changelog-producer' = 'input' \n" +
                    " );"


                    // 输出目标表
                    String sinkADSDDL =
                    "CREATE TABLE IF NOT EXISTS ads_pricing_summary (\n" +
                    "l_returnflag CHAR(1) NOT NULL,\n" +
                    "l_linestatus CHAR(1) NOT NULL,\n" +
                    "sum_quantity DOUBLE NOT NULL,\n" +
                    "sum_base_price DOUBLE NOT NULL,\n" +
                    "sum_discount_price DOUBLE NOT NULL,\n" +
                    "sum_charge_vat_inclusive DOUBLE NOT NULL,\n" +
                    "avg_quantity DOUBLE NOT NULL,\n" +
                    "avg_base_price DOUBLE NOT NULL,\n" +
                    "avg_discount DOUBLE NOT NULL,\n" +
                    "count_order BIGINT NOT NULL \n" +
                    ") WITH ( \n" +
                    "'bucket' = '2'\n" +
                    ");"




                    // 简单的聚合处理
                    String transformSQL1 =
                    "INSERT INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +
                                     "l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +
                    "YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"

                    String transformSQL2=
                    "INSERT INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +
                    "SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" +
                    "SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" +
                    "AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" +
                    "WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" +
                    "GROUP BY l_returnflag,l_linestatus;"




                    // 插入hudi表
                    tableEnv.executeSql(tableSourceDDL);
                    tableEnv.executeSql(sourceDDL);
                    tableEnv.executeSql(sinkDWDDDL);
                            tableEnv.executeSql(sinkADSDDL);
                            
                            TableResult result = tableEnv.executeSql(transformSQL1);
                    TableResult result = tableEnv.executeSql(transformSQL2);


                    env.execute("mysql-to-tableSource");
                    }
                    }

                    详细请参阅:

                      https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md


                      文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论