flink cdc到doris
flink cdc同步到doris主类:
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisReadOptions;import org.apache.doris.flink.sink.DorisSink;import org.apache.doris.flink.sink.writer.SimpleStringSerializer;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.UUID;/***** Synchronize the full database through flink cdc**/public class DatabaseFullSync {private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSync.class);private static String HOST = "127.0.0.1";private static String MYSQL_PASSWD = "password";private static int MYSQL_PORT = 3306;private static int DORIS_PORT = 8030;private static String MYSQL_USER = "root";private static String SYNC_DB = "test";private static String SYNC_TBLS = "test.*";private static String TARGET_DORIS_DB = "test";public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(HOST).port(MYSQL_PORT).databaseList(SYNC_DB) set captured database.tableList(SYNC_TBLS) set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(new JsonDebeziumDeserializationSchema()) converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");get table listList<String> tableList = getTableList();LOG.info("sync table list:{}",tableList);for(String tbl : tableList){SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);SingleOutputStreamOperator<String> cleanStream = clean(filterStream);DorisSink dorisSink = buildDorisSink(tbl);cleanStream.sinkTo(dorisSink).name("sink " + tbl);}env.execute("Full Database Sync ");}*** Get real data* {* "before":null,* "after":{* "id":1,* "name":"zhangsan-1",* "age":18* },* "source":{* "db":"test",* "table":"test_1",* ...* },* "op":"c",* ...* }* */private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {return source.flatMap(new FlatMapFunction<String,String>(){@Overridepublic void flatMap(String row, Collector<String> out) throws Exception {try{JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");history,insert,updateif(Arrays.asList("r","c","u").contains(op)){out.collect(rowJson.getJSONObject("after").toJSONString());}else{LOG.info("filter other op:{}",op);}}catch (Exception ex){LOG.warn("filter other format binlog:{}",row);}}});}*** Divide according to tablename* */private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {return source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String row) throws Exception {try {JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);}catch (Exception ex){ex.printStackTrace();return false;}}});}*** Get all MySQL tables that need to be synchronized* */private static List<String> getTableList() {List<String> tables = new ArrayList<>();String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for(JSONObject jsob : tableList){String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName + "." + tblName;if(schemaTbl.matches(SYNC_TBLS)){tables.add(tblName);}}return tables;}*** create doris sink* */public static DorisSink buildDorisSink(String table){DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT).setTableIdentifier(TARGET_DORIS_DB + "." + table).setUsername("root").setPassword("");Properties pro = new Properties();json data formatpro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-" + table + UUID.randomUUID()) streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) serialize according to string.setDorisOptions(dorisBuilder.build());return builder.build();}}
JdbcUtil类:
import com.alibaba.fastjson.JSONObject;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.util.ArrayList;import java.util.List;public class JdbcUtil {static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}}private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);public static void main(String[] args) throws SQLException {}public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){List<JSONObject> beJson = new ArrayList<>();String connectionUrl = String.format("jdbc:mysql://%s:%s/",hostUrl,port);Connection con = null;try {con = DriverManager.getConnection(connectionUrl,user,password);PreparedStatement ps = con.prepareStatement(sql);ResultSet rs = ps.executeQuery();beJson = resultSetToJson(rs);} catch (SQLException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();} finally {try {con.close();} catch (Exception e) {}}return beJson;}private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {List<JSONObject> list = new ArrayList<>();ResultSetMetaData metaData = rs.getMetaData();int columnCount = metaData.getColumnCount();while (rs.next()) {JSONObject jsonObj = new JSONObject();for (int i = 1; i <= columnCount; i++) {String columnName =metaData.getColumnLabel(i);String value = rs.getString(columnName);jsonObj.put(columnName, value);}list.add(jsonObj);}return list;}}
flink cdc到starrocks

主要实现的流程:
Flink cdc 采集 mysql 数据
将 cdc 采集到的数据转为 json
从 json 中获取 数据库、表和数据
用数据库和表对数据做 key by
使用 process function 处理每个表的数据,用状态缓存数据,缓存数据达到一定量或者缓存了一定时间(用 timer 触发缓存时间触发的场景)StarRocks 写数据
sink 中拼接数据 使用 Stream Load 往 StarRocks 写数据
主类 CdcToStarRocks
主要流程很简单: source -> map -> keyBy -> process -> sink,source 读取 mysql binlog(或者全量+增量),map 转换数据格式,keyBy 以数据库名 + 表名对数据分区,process 中对数据攒批,sink 输出数据到 StarRocks
import com.venn.source.mysql.cdc.CommonStringDebeziumDeserializationSchema;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;/*** mysql cdc demo* <p>* cdc 整库同步数据到 starrocks* <p>* 局限:* 1. 还未实现 starrocks 端表结构跟随 源端表结构同步变更* 2. 为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更* 3. 部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据*/public class CdcToStarRocks {// 每个批次最大条数和等待时间private static int batchSize = 10000;private static long batchInterval = 10 *1000;public static void main(String[] args) throws Exception {String ip = "localhost";int port = 3306;String db = "venn";// String table = "venn.user_log";String table = "venn.*";String user = "root";String pass = "123456";String starrocksIp = "10.201.0.230";String starrocksPort = "29030";String starrocksLoadPort = "28030";String starrocksUser = "root";String starrocksPass = "123456";String starrocksDb = "test";StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(ip).port(port)// 获取两个数据库的所有表.databaseList(db).tableList(table).username(user).password(pass).startupOptions(StartupOptions.latest())// do not cache schema change// .includeSchemaChanges(false)// .startupOptions(StartupOptions.initial())// 自定义 解析器,讲数据解析成 json.deserializer(new CommonStringDebeziumDeserializationSchema(ip, port)).build();env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "cdc").name("source").uid("source")// json 字符串转 CdcRecord.map(new CdcStarMapFunction()).name("map").keyBy( record -> record.getDb() + "_" + record.getTable()).process(new CdcStarProcessFunction(batchSize, batchInterval)).name("process").uid("process").addSink(new StarRocksSink(starrocksIp, starrocksPort, starrocksLoadPort, starrocksUser, starrocksPass, starrocksDb)).name("sink");env.execute("cdcToStarRocks");}}
反序列化器 CommonStringDebeziumDeserializationSchema
反序列化器直接拿之前写的通用的 flink cdc 反序列化器过来,继承 DebeziumDeserializationSchema,主要是从数据中获取 数据库、表、操作类型和数据,需求特别注意以下几点:
insert 类型的操心,只需要获取 after 中的数据
update 类型的操作,需要同时解析 before 和 after 的数据,before 是修改前的,after 是修改后的,如果不需要修改前的,可以只获取 after
delete 类型的操作,需要获取 before
如果有 ddl 操作,需要特殊处理(本次不包含)
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.google.gson.JsonObject;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;/*** deserialize debezium format binlog*/public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {private String host;private int port;public CommonStringDebeziumDeserializationSchema(String host, int port) {this.host = host;this.port = port;}public void deserialize(SourceRecord record, Collector<String> out) {JsonObject jsonObject = new JsonObject();String binlog = record.sourceOffset().get("file").toString();String offset = record.sourceOffset().get("pos").toString();String ts_sec = record.sourceOffset().get("ts_sec").toString();// System.out.println("binlog : " + binlog + ", offset = " + offset);todo get schame changejsonObject.addProperty("host", host);add metajsonObject.addProperty("binlog", binlog);jsonObject.addProperty("offset", offset);jsonObject.addProperty("ts_sec", ts_sec);jsonObject.addProperty("port", port);jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));String[] name = record.valueSchema().name().split("\\.");jsonObject.addProperty("db", name[1]);jsonObject.addProperty("table", name[2]);Struct value = ((Struct) record.value());String operatorType = value.getString("op");jsonObject.addProperty("operator_type", operatorType);c : create, u: update, d: delete, r: readinsert updateif (!"d".equals(operatorType)) {Struct after = value.getStruct("after");JsonObject afterJsonObject = parseRecord(after);jsonObject.add("after", afterJsonObject);}update & deleteif ("u".equals(operatorType) || "d".equals(operatorType)) {Struct source = value.getStruct("before");JsonObject beforeJsonObject = parseRecord(source);jsonObject.add("before", beforeJsonObject);}jsonObject.addProperty("parse_time", System.currentTimeMillis() 1000);out.collect(jsonObject.toString());}private JsonObject parseRecord(Struct after) {JsonObject jo = new JsonObject();for (Field field : after.schema().fields()) {switch ((field.schema()).type()) {case INT8:int resultInt8 = after.getInt8(field.name());jo.addProperty(field.name(), resultInt8);break;case INT64:Long resultInt = after.getInt64(field.name());jo.addProperty(field.name(), resultInt);break;case FLOAT32:Float resultFloat32 = after.getFloat32(field.name());jo.addProperty(field.name(), resultFloat32);break;case FLOAT64:Double resultFloat64 = after.getFloat64(field.name());jo.addProperty(field.name(), resultFloat64);break;case BYTES:json ignore byte columnbyte[] resultByte = after.getBytes(field.name());jo.addProperty(field.name(), String.valueOf(resultByte));break;case STRING:String resultStr = after.getString(field.name());jo.addProperty(field.name(), resultStr);break;default:}}return jo;}public TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}
转换函数 CdcStarMapFunction
CdcStarMapFunction 比较简单,将 json 字符串,转成 CdcRecord 类型的对象,只获取了需要的 数据库、表、操作类型和数据。
获取数据时,insert 和 update 只获取 after 的值
import java.util.LinkedHashMap;import java.util.Map;import lombok.Data;/*** cdcRecord save*/@Datapublic class CdcRecord {private String db;private String table;private String op;private Map<String, String> data = new LinkedHashMap<>();}
import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.JsonParser;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.configuration.Configuration;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class CdcStarMapFunction extends RichMapFunction<String, CdcRecord> {private final static Logger LOG = LoggerFactory.getLogger(CdcStarMapFunction.class);private JsonParser parser;@Overridepublic void open(Configuration parameters) throws Exception {parser = new JsonParser();}@Overridepublic CdcRecord map(String element) throws Exception {LOG.debug("data : {}" , element );JsonObject object = parser.parse(element).getAsJsonObject();String db = object.get("db").getAsString();String table = object.get("table").getAsString();String op = object.get("operator_type").getAsString();CdcRecord record = new CdcRecord(db, table, op);insert/updateString dataLocation = "after";if("d".equals(op)){if op is delete, get beforedataLocation = "before";}JsonObject data = object.get(dataLocation).getAsJsonObject();for(Map.Entry<String, JsonElement> entry: data.entrySet()){String columnName = entry.getKey();String columnValue;JsonElement value = entry.getValue();if(!value.isJsonNull()){if column value is not null, get as stringcolumnValue = value.getAsString();put column name/value to record.datarecord.getData().put(columnName, columnValue);}}return record;}}
处理函数 CdcStarProcessFunction
CdcStarProcessFunction 有三部分逻辑:
三个状态cacheTimer、cacheSize、cache,分别存下一次timer 触发时间、缓存的数据条数、缓存的数据
process 处理每个表的数据,每个表的数据第一次到的时候,基于当前时间 + batchInterval,注册下次时间触发的 timer。数据存储到 cache 中,如果数据量超过预定的 batchSize,触发 flushData 方法往下游输出数据,并删除之前注册的定时器,清理状态
timer 触发 flushData 方法往下游输出数据,清理状态
import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Iterator;import java.util.List;public class CdcStarProcessFunction extends KeyedProcessFunction<String, CdcRecord, List<CdcRecord>> {private final static Logger LOG = LoggerFactory.getLogger(CdcStarProcessFunction.class);private int batchSize;private long batchInterval;next timer timeprivate ValueState<Long> cacheTimer;current cache sizeprivate ValueState<Integer> cacheSize;/ cache dataprivate ListState<CdcRecord> cache;public CdcStarProcessFunction(int batchSize, long batchInterval) {this.batchSize = batchSize;this.batchInterval = batchInterval;}@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor cacheDescriptor = new ListStateDescriptor<CdcRecord>("cache", TypeInformation.of(CdcRecord.class));cache = getRuntimeContext().getListState(cacheDescriptor);ValueStateDescriptor cacheSizeDescriptor = new ValueStateDescriptor<Integer>("cacheSize", Integer.class);cacheSize = getRuntimeContext().getState(cacheSizeDescriptor);ValueStateDescriptor cacheTimerDescriptor = new ValueStateDescriptor<Long>("cacheTimer", Long.class);cacheTimer = getRuntimeContext().getState(cacheTimerDescriptor);}@Overridepublic void processElement(CdcRecord element, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.Context ctx, Collector<List<CdcRecord>> out) throws Exception {// cache size + 1if (cacheSize.value() != null) {cacheSize.update(cacheSize.value() + 1);} else {cacheSize.update(1);// add timer for interval flushlong nextTimer = System.currentTimeMillis() + batchInterval;LOG.debug("register timer : {} , key : {}", nextTimer, ctx.getCurrentKey());cacheTimer.update(nextTimer);ctx.timerService().registerProcessingTimeTimer(nextTimer);}// add data to cache statecache.add(element);// cache size max than batch Sizeif (cacheSize.value() >= batchSize) {// remove next timerlong nextTimer = cacheTimer.value();LOG.debug("{} remove timer, key : {}", nextTimer, ctx.getCurrentKey());ctx.timerService().deleteProcessingTimeTimer(nextTimer);// flush data to down streamflushData(out);}}/*** flush data to down stream*/private void flushData(Collector<List<CdcRecord>> out) throws Exception {List<CdcRecord> tmpCache = new ArrayList<>();Iterator<CdcRecord> it = cache.get().iterator();while (it.hasNext()) {tmpCache.add(it.next());}if (tmpCache.size() > 0) {out.collect(tmpCache);// finish flush all cache data, clear statecache.clear();cacheSize.clear();cacheTimer.clear();}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, CdcRecord, List<CdcRecord>>.OnTimerContext ctx, Collector<List<CdcRecord>> out) throws Exception {LOG.info("{} trigger timer to flush data", ctx.getCurrentKey(), timestamp);// batch interval trigger flush dataflushData(out);}@Overridepublic void close() throws Exception {}}
输出函数 StarRocksSink
StarRocksSink 稍微复杂一点,需要基于数据中的表名,去目标数据库中获取对应的表结构(为了避免每次查询数据库,将获取到的表结构存到内存中),基于目标表的字段顺序从数据中获取对应列的值,拼接上数据的操作类型。
StarRocksSink 在往 StarRocks 写数据的时候,实现了 upsert 和 delete 操作,需要在数据中拼接上 0/1,0 代表 UPSERT 操作,1 代表 DELETE 操作
见参考文档1
invoke 方法
StarRocksSink 的核心逻辑都在 invoke 方法中,逻辑如下:
从数据中获取数据库和表,拼接成 key
获取目标表的 schema(整库映射,源端和目标端表名一致),先从缓存中获取,如果不存在就从数据库中获取
组装数据
拼接 load url
用 http 方式往 StarRocks 写数据
loadTargetTableSchema 方法
执行 desc db.table
获取目标表的表结构,组装成两种结果:将所有列名用 "," 拼接成字符串,再拼接 "__op" 用于 http header 请求中标识数据的列;将所有列按顺序添加到 list 中,用于从源数据中获取对应列的数据
parseUploadData 方法
用目标表列顺序,从数据中获取对应列的值,使用列分隔符拼接数据,最后基于操作类型拼接 0/1,删除拼接 1,其他类型拼接 0
doHttp 方法
用 http 的方式往 StarRocks 中写数据,没什么特别的,忽略
import org.apache.commons.codec.binary.Base64;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.http.HttpException;import org.apache.http.HttpHeaders;import org.apache.http.HttpRequest;import org.apache.http.HttpRequestInterceptor;import org.apache.http.client.methods.CloseableHttpResponse;import org.apache.http.client.methods.HttpPut;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.CloseableHttpClient;import org.apache.http.impl.client.DefaultRedirectStrategy;import org.apache.http.impl.client.HttpClientBuilder;import org.apache.http.impl.client.HttpClients;import org.apache.http.protocol.HTTP;import org.apache.http.protocol.HttpContext;import org.apache.http.util.EntityUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.sql.*;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class StarRocksSink extends RichSinkFunction<List<CdcRecord>> {private final static Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);public final static String COL_SEP = "\\\\x01";public final static String ROW_SEP = "\\\\x02";public final static String NULL_COL = "\\N";private String ip;private String port;private String loadPort;private String user;private String pass;private String db;private Connection connection;private Map<String, String> spliceColumnMap = new HashMap<>();private Map<String, List<String>> columnMap = new HashMap<>();public StarRocksSink() {}public StarRocksSink(String ip, String port, String loadPort, String user, String pass, String db) {this.ip = ip;this.port = port;this.loadPort = loadPort;this.user = user;this.pass = pass;this.db = db;}@Overridepublic void open(Configuration parameters) throws Exception {reConnect();}@Overridepublic void invoke(List<CdcRecord> element, Context context) throws Exception {LOG.info("write batch size: " + element.size());if(element == null || element.size() ==0){LOG.info("ignore empty element");return;}// use StarRocks db name// String db = cache.get(0).getDb();String table = element.get(0).getTable();String key = db + "_" + table;// get table schemaList<String> columnList;if (!columnMap.containsKey(key)) {// db.table is first coming, load column, put to spliceColumnMap & columnMaploadTargetTableSchema(key, db, table);}String columns = spliceColumnMap.get(key);columnList = columnMap.get(key);if (columnList.size() == 0) {LOG.info("{}.{} not exists in target starrocks, ingore data change", db, table);}// make up dataString data = parseUploadData(element, columnList);final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", ip, loadPort, db, table);String label = db + "_" + table + "_" + System.currentTimeMillis();// send data to starrocksdoHttp(loadUrl, data, label, columns);}/*** http send data to starrocks*/private void doHttp(String loadUrl, String data, String label, String columns) throws IOException, SQLException {final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {@Overrideprotected boolean isRedirectable(String method) {return true;}}).addInterceptorFirst(new ContentLengthHeaderRemover());try (CloseableHttpClient client = httpClientBuilder.build()) {HttpPut put = new HttpPut(loadUrl);StringEntity entity = new StringEntity(data, "UTF-8");put.setHeader(HttpHeaders.EXPECT, "100-continue");put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, pass));// the label header is optional, not necessary// use label header can ensure at most once semanticsput.setHeader("label", label);put.setHeader("columns", columns);put.setHeader("row_delimiter", ROW_SEP);put.setHeader("column_separator", COL_SEP);put.setEntity(entity);try (CloseableHttpResponse response = client.execute(put)) {String loadResult = "";if (response.getEntity() != null) {loadResult = EntityUtils.toString(response.getEntity());}final int statusCode = response.getStatusLine().getStatusCode();// statusCode 200 just indicates that starrocks be service is ok, not stream load// you should see the output content to find whether stream load is successif (statusCode != 200) {throw new IOException(String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));}}}}private String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}private String parseUploadData(List<CdcRecord> cache, List<String> columnList) {StringBuilder builder = new StringBuilder();for (CdcRecord element : cache) {Map<String, String> data = element.getData();for (String column : columnList) {if (data.containsKey(column)) {builder.append(data.get(column)).append(COL_SEP);} else {// if target column not exists in source data, set as nullbuilder.append(NULL_COL).append(COL_SEP);}}// add __opif ("d".equals(element.getOp())) {// deletebuilder.append("1");} else {// upsertbuilder.append("0");}// add row separatorbuilder.append(ROW_SEP);}// remove last row sepbuilder = builder.delete(builder.length() - 5, builder.length());String data = builder.toString();return data;}/*** load table schema, parse to http column and column list for load source data*/private void loadTargetTableSchema(String key, String db, String table) throws SQLException {List<String> columnList = new ArrayList<>();StringBuilder builer = new StringBuilder();try {// load table schemaPreparedStatement insertPS = connection.prepareStatement("desc " + db + "." + table);ResultSet result = insertPS.executeQuery();while (result.next()) {String column = result.getString(1);builer.append(column).append(",");columnList.add(column);}} catch (SQLException e) {LOG.warn("load {}.{} schema error. {}", db, table, e.getStackTrace());}builer.append("__op");String columns = builer.toString();spliceColumnMap.put(key, columns);columnMap.put(key, columnList);}/*** reconnect to starrocks** @throws SQLException*/private void reConnect() throws SQLException {String driver = "jdbc:mysql://" + ip + ":" + port;if (connection == null || connection.isClosed()) {connection = DriverManager.getConnection(driver, user, pass);}}@Overridepublic void finish() throws Exception {LOG.info("finish");}@Overridepublic void close() throws Exception {LOG.info("close...");connection.close();}private static class ContentLengthHeaderRemover implements HttpRequestInterceptor {@Overridepublic void process(HttpRequest request, HttpContext context) throws HttpException, IOException {// fighting org.apache.http.protocol.RequestContent's ProtocolException("Content-Length header already present");request.removeHeaders(HTTP.CONTENT_LEN);}}}
局限性:
还未实现 starrocks 端表结构跟随 源端表结构同步变更
为了保证效率,仅会在每一个表第一次来的时候判断目标段是否存在该表,如果已经判定该表不存在,后续直接忽略该表的数据变更
部分不导入的表,只在sink 的时候做了过滤,前面的操作还是要继续,可以考虑在 反序列化活map中过滤掉目标库中不存在的表数据
flink cdc到table store
package name.lijiaqi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.SqlDialect;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToHudiExample {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TEMPORARY TABLE ods_lineitem (\n" +" l_orderkey INT NOT NULL,\n" +" l_partkey INT NOT NULL,\n" +" l_suppkey INT NOT NULL,\n" +" l_linenumber INT NOT NULL,\n" +" l_quantity DECIMAL(15, 2) NOT NULL,\n" +" l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +" l_discount DECIMAL(15, 2) NOT NULL,\n" +" l_tax DECIMAL(15, 2) NOT NULL,\n" +" l_returnflag CHAR(1) NOT NULL,\n" +" l_linestatus CHAR(1) NOT NULL,\n" +" l_shipdate DATE NOT NULL,\n" +" l_commitdate DATE NOT NULL,\n" +" l_receiptdate DATE NOT NULL,\n" +" l_shipinstruct CHAR(25) NOT NULL,\n" +" l_shipmode CHAR(10) NOT NULL,\n" +" l_comment VARCHAR(44) NOT NULL,\n" +" PRIMARY KEY (l_orderkey, l_linenumber) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = '127.0.0.1',\n" + -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal" 'port' = '3307',\n" +" 'username' = 'flink',\n" +" 'password' = 'flink',\n" +" 'database-name' = 'tpch_s10',\n" +" 'table-name' = 'lineitem' \n" +");"String tableSourceDDL="CREATE CATALOG `table_store` WITH (\n" +"'type' = 'table-store',\n" +"'warehouse' = '/tmp/table-store-101'\n" +");\n" +"USE CATALOG `table_store`;"// 输出目标表String sinkDWDDDL ="CREATE TABLE IF NOT EXISTS dwd_lineitem (\n" +" l_orderkey INT NOT NULL,\n" +" l_partkey INT NOT NULL,\n" +" l_suppkey INT NOT NULL,\n" +" l_linenumber INT NOT NULL,\n" +" l_quantity DECIMAL(15, 2) NOT NULL,\n" +" l_extendedprice DECIMAL(15, 2) NOT NULL,\n" +" l_discount DECIMAL(15, 2) NOT NULL,\n" +" l_tax DECIMAL(15, 2) NOT NULL,\n" +" l_returnflag CHAR(1) NOT NULL,\n" +" l_linestatus CHAR(1) NOT NULL,\n" +" l_shipdate DATE NOT NULL,\n" +" l_commitdate DATE NOT NULL,\n" +" l_receiptdate DATE NOT NULL,\n" +" l_shipinstruct CHAR(25) NOT NULL,\n" +" l_shipmode CHAR(10) NOT NULL,\n" +" l_comment VARCHAR(44) NOT NULL,\n" +" l_year BIGINT NOT NULL,\n" +" l_month BIGINT NOT NULL,\n" +" PRIMARY KEY (l_orderkey, l_linenumber, l_year, l_month) NOT ENFORCED \n" +" ) PARTITIONED BY (l_year, l_month) WITH (" -- 每个 partition 下设置 2 个 bucket" 'bucket' = '2',\n" +" -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点" 'changelog-producer' = 'input' \n" +" );"// 输出目标表String sinkADSDDL ="CREATE TABLE IF NOT EXISTS ads_pricing_summary (\n" +"l_returnflag CHAR(1) NOT NULL,\n" +"l_linestatus CHAR(1) NOT NULL,\n" +"sum_quantity DOUBLE NOT NULL,\n" +"sum_base_price DOUBLE NOT NULL,\n" +"sum_discount_price DOUBLE NOT NULL,\n" +"sum_charge_vat_inclusive DOUBLE NOT NULL,\n" +"avg_quantity DOUBLE NOT NULL,\n" +"avg_base_price DOUBLE NOT NULL,\n" +"avg_discount DOUBLE NOT NULL,\n" +"count_order BIGINT NOT NULL \n" +") WITH ( \n" +"'bucket' = '2'\n" +");"// 简单的聚合处理String transformSQL1 ="INSERT INTO dwd_lineitem SELECT l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,\n" +"l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,\n" +"YEAR(l_shipdate) AS l_year,MONTH(l_shipdate) AS l_month FROM ods_lineitem;"String transformSQL2="INSERT INTO ads_pricing_summary SELECT l_returnflag,l_linestatus,SUM(l_quantity) AS sum_quantity,\n" +"SUM(l_extendedprice) AS sum_base_price,SUM(l_extendedprice * (1-l_discount)) AS sum_discount_price,\n" +"SUM(l_extendedprice * (1-l_discount) * (1+l_tax)) AS sum_charge_vat_inclusive,AVG(l_quantity) AS avg_quantity,\n" +"AVG(l_extendedprice) AS avg_base_price,AVG(l_discount) AS avg_discount,COUNT(*) AS count_order FROM dwd_lineitem \n" +"WHERE (l_year < 1998 OR (l_year = 1998 AND l_month<= 9)) AND l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY \n" +"GROUP BY l_returnflag,l_linestatus;"// 插入hudi表tableEnv.executeSql(tableSourceDDL);tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDWDDDL);tableEnv.executeSql(sinkADSDDL);TableResult result = tableEnv.executeSql(transformSQL1);TableResult result = tableEnv.executeSql(transformSQL2);env.execute("mysql-to-tableSource");}}
详细请参阅:
https://github.com/LadyForest/flink-table-store-101/blob/master/real-time-update/README.zh.md




