暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink cdc技术在数据库同步方向的应用

原创 途er 2022-12-13
1630

Flink cdc技术在数据库同步方向的应用

项目背景

由于流量回放平台对录制环境和回放环境数据库一致性的要求很高,比如数据库的一致性误差在10分钟以内,数据库同步任务频繁等。mysqldump是mysql数据库提供的一个数据备份工具。mysqldump可以把mysql数据库导出成sql语句文件,并保存到磁盘上。mysqldump命令产生的.sql文件包含一系列SQL INSERT语句,可以用来进行数据恢复。进行数据导出和恢复的过程中耗费的时间较长远超过10分钟;且操作复杂,需要DBA协助。对于频繁的同步任务对DBA是不友好的。公司现有的数据库同步工具outter不支持全量数据同步;对应大数据量的数据同步,数据延迟比较大;对于大款表的同步频繁OOM。并且以上两种方式均不支持单租户级的数据同步。所以需要自己研发一款使用于流量回放平台的数据库同步工具,因此我们必须研发一款适用于流量回放平台的数据库同步工具。

需求分析

需要实现的目标

流量回放平台对数据库同步工具有以下要求:

  1. 源数据库(录制环境数据库)和目标数据(回放环境数据库)库的数据误差在开启录制时数据时间差在10分钟以内;
  2. 需要支持utf-8和ucs2(早期的大表使用的是此种字符集需要兼容)字符集;
  3. 可以自由选择需要同步的源数据库表和目标数据库表,且库名允许不一致;
  4. 由于不通环境的分库情况不同,还需要合并多个源库到一个目标库;
  5. 允许从源库同步单租户级别的数据至目标库;
  6. 支持源库数据增量同步至目标数据库;
  7. 支持单/多任务同步同一实例的多个数据库;
  8. 可以简单的操作就可以进行数据库任务同步,便于频繁数据库同步任务的执行。

技术方案的确定

CDC介绍

CDC的全称是Change Data Capture,是一种捕获增量数据的统称,目前主要应用在捕获数据库数据变更的技术,在数据同步、数据容灾、数据分发和数据集成等场景中。

目前业界比较流行的CDC技术解决方案主要分为以下两种:

基于查询的CDC
  • 离线调度查询作业,通过JDBC的方式发起一次查询请求,服务端根据查询SQL进行解析优化执行并返回相应的结果集;
  • 离线调度的方式天然的缺点是无法保证数据库的一致性和实时性,因为在查询的过程中数据可能发生了多次变化;
  • 对源数据库有侵入性,对源数据库有一定的IO压力,所以在使用过程中需要充分考虑到源数据库压力,协调执行时间和执行并行数,或考虑从备库进行数据同步;
基于日志的CDC
  • 实时消费日志,流处理。例如Mysql的日志文件binlog完整的记录了数据的变更,可以把binlog文件作为流得数据源;
  • 保证了数据的一致性,因为日志文件完整的记录了数据的变更;
  • 保证了数据的实时性,因为类似binlog日志的文件是可以流式消费的,提供的是实时的数据,只要处理得够快,就能保证源数据库和目标数据库的数据同步在秒级别

技术选型对比

对于常见的CDC开源数据库同步工具方案的对比,如下:
常见开源CDC方案比较

CDC方案比较

技术方案确定

通过以上的对比,可以看出得出以下结论:

  1. 在全量+增量同步的能力上是比较好的是Sqoop、Flink CDC、Debezium、Oracle Goldengate;
  2. 在1中支持断点续传的有Flink CDC、Debezium、Oracle Goldengate
  3. Oracle Goldengate主要是针对Oracle的;Flink CDC是基于Debezium开发的,并且进行了扩展增强,分布式架构,java语言开发,且社区比较活跃

所以我们决定基于Flink CDC来研发适用于流量回放平台的数据库同步工具(所有的表必须有主键)。
关于Flink CDC介绍,感兴趣的同学可以点击此处查看

功能实现

确定了技术方案后,我们就根据Flink CDC已有的功能上来实现自己的个性化定制,主要分为以下几个方面:

1.根据Flink CDC读取的数据进行解析,自定义反序列化方案

Flink CDC默认使用JsonDebeziumDeserializationSchema进行反序列化,把数据库的数据变更以json格式展示,并不符合我们的个性化需求,所以我们根据自己的需求定义了自己的反序列化器SqlDeserializationSchema,在此反序列器中我们做了如下实现:

1.把所有的操作分为DDL和DML语句

Envelope.Operation operation = Envelope.Operation.forCode((String) valueStruct.get(OPERATION));

2.把DML语句划分成不同类型的sql操作语句

public enum MysqlOperate {

    // ddl
    DDL,
    // dml
    INSERT,
    UPDATE,
    DELETE,
    // 未定义
    UNDEFINE,
    ;

}

在基于查询的CDC中只会转化成INSERT语句,在基于日志的CDC中数据的操作记录会转化为INSERT、UPDATE、DELETE语句进行处理。

3.UPDATE和DELETE操作需要根据主键来进行操作

List <Field> primaryKeys = (((Struct) sourceRecord.key()).schema()).fields();

4.分析Flink CDC给我们的数据格式,按照不通类型进行分析

  • 基于查询的CDC,使用JDBC查询的数据集
{"before":null,"after":{"config_id":1,"config_name":"Timothy Mendoza","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"LnwDJwQL3l","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1650611958990,"transaction":null}

  • 基于binlog日志的CDC数据变更的数据集-INSERT
{"before":null,"after":{"config_id":2,"config_name":"Timothy Mendoza","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"LnwDJwQL3l","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650612061000,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000007","pos":653648221,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1650612061677,"transaction":null}
  • 基于binlog日志的CDC数据变更的数据集-UPDATE
{"before":{"config_id":1,"config_name":"Timothy Mendoza","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"LnwDJwQL3l","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p"},"after":{"config_id":1,"config_name":"likenan","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"LnwDJwQL3l","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650612078000,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000007","pos":653648657,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1650612078963,"transaction":null}
  • 基于binlog日志的CDC数据变更的数据集-DELETE
{"before":{"config_id":1,"config_name":"tangrui","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"LnwDJwQL3l","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650612156000,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000007","pos":653649872,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1650612157103,"transaction":null}
  • 基于binlog日志的CDC数据结构变更的数据集-新增表字段
{"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650612638787,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000007","pos":653652112,"row":0,"thread":null,"query":null},"historyRecord":"{\"source\":{\"file\":\"binlog.000007\",\"pos\":653652112,\"server_id\":1},\"position\":{\"transaction_id\":null,\"ts_sec\":1650612638,\"file\":\"binlog.000007\",\"pos\":653652319,\"server_id\":1},\"databaseName\":\"xsy_flowreplay\",\"ddl\":\"ALTER TABLE `xsy_flowreplay`.`sys_config_copy1` ADD COLUMN `test1` varchar(255) NULL AFTER `remark`\",\"tableChanges\":[{\"type\":\"ALTER\",\"id\":\"\\\"xsy_flowreplay\\\".\\\"sys_config_copy1\\\"\",\"table\":{\"defaultCharsetName\":\"utf8mb4\",\"primaryKeyColumnNames\":[\"config_id\"],\"columns\":[{\"name\":\"config_id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":true,\"generated\":true},{\"name\":\"config_name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":100,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_key\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":100,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_value\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":500,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_type\",\"jdbcType\":1,\"typeName\":\"CHAR\",\"typeExpression\":\"CHAR\",\"charsetName\":\"utf8mb4\",\"length\":1,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"create_by\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":64,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"create_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"update_by\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":64,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"update_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"remark\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":500,\"position\":10,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"test1\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":255,\"position\":11,\"optional\":true,\"autoIncremented\":false,\"generated\":false}]}}]}"}
  • 基于binlog日志的CDC数据结构变更的数据集-新增数据表

先drop然后create最后同步数据

{"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650616228090,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000117","pos":291987079,"row":0,"thread":null,"query":null},"historyRecord":"{\"source\":{\"file\":\"binlog.000117\",\"pos\":291987079,\"server_id\":1},\"position\":{\"transaction_id\":null,\"ts_sec\":1650616228,\"file\":\"binlog.000117\",\"pos\":291987241,\"server_id\":1},\"databaseName\":\"xsy_flowreplay\",\"ddl\":\"DROP TABLE IF EXISTS `xsy_flowreplay`.`sys_config_copy1`\",\"tableChanges\":[]}"}
{"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650616228222,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000117","pos":291987320,"row":0,"thread":null,"query":null},"historyRecord":"{\"source\":{\"file\":\"binlog.000117\",\"pos\":291987320,\"server_id\":1},\"position\":{\"transaction_id\":null,\"ts_sec\":1650616228,\"file\":\"binlog.000117\",\"pos\":291988602,\"server_id\":1},\"databaseName\":\"xsy_flowreplay\",\"ddl\":\"CREATE TABLE `sys_config_copy1` (\\n  `config_id` int NOT NULL AUTO_INCREMENT COMMENT '参数主键',\\n  `config_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '参数名称',\\n  `config_key` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '参数键名',\\n  `config_value` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '参数键值',\\n  `config_type` char(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT 'N' COMMENT '系统内置(Y是 N否)',\\n  `create_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '创建者',\\n  `create_time` datetime DEFAULT NULL COMMENT '创建时间',\\n  `update_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '更新者',\\n  `update_time` datetime DEFAULT NULL COMMENT '更新时间',\\n  `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '备注',\\n  `test1` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,\\n  PRIMARY KEY (`config_id`) USING BTREE\\n) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=COMPACT COMMENT='参数配置表'\",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"xsy_flowreplay\\\".\\\"sys_config_copy1\\\"\",\"table\":{\"defaultCharsetName\":\"utf8mb4\",\"primaryKeyColumnNames\":[\"config_id\"],\"columns\":[{\"name\":\"config_id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":true,\"generated\":true},{\"name\":\"config_name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":100,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_key\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":100,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_value\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":500,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"config_type\",\"jdbcType\":1,\"typeName\":\"CHAR\",\"typeExpression\":\"CHAR\",\"charsetName\":\"utf8mb4\",\"length\":1,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"create_by\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":64,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"create_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"update_by\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":64,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"update_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"remark\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":500,\"position\":10,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"test1\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":255,\"position\":11,\"optional\":true,\"autoIncremented\":false,\"generated\":false}]}}]}"}
{"before":null,"after":{"config_id":1,"config_name":"Timothy Mendoza","config_key":"Paai6QCGyu","config_value":"RSVnKEpg7H","config_type":"U","create_by":"","create_time":997278609000,"update_by":"2008-09-20","update_time":1283015983000,"remark":"SvhrczrD0p","test1":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1650616228000,"snapshot":"false","db":"xsy_flowreplay","sequence":null,"table":"sys_config_copy1","server_id":1,"gtid":null,"file":"binlog.000117","pos":291988870,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1650616228259,"transaction":null}

5. 租户数据过滤

对于没有tenant_id字段的表进行全表数据同步

/**
 * 指定租户id
 */
private Set <String> tenantIds = new HashSet <>();
private String dbNameSuffix;

public SqlDeserializationSchema(String[] tenantIds, String dbNameSuffix) {
    this.tenantIds.addAll(Arrays.asList(tenantIds));
    this.dbNameSuffix = dbNameSuffix;
}

/**
 * 判断是否是指定租户
 */
private boolean isSpecifduiyTenantId(Struct valueStruct, Envelope.Operation op) {
    Long tenantId;
    switch (op) {
        case READ:
        case CREATE:
        case UPDATE:
            tenantId = getTenantId(valueStruct.getStruct("after"));
            break;
        case DELETE:
            tenantId = getTenantId(valueStruct.getStruct("before"));
            break;
        default:
            return true;
    }
    if (tenantId == null) {
        return true;
    }
    log.debug("租户ids:{}", tenantIds);
    if (!tenantIds.contains(tenantId.toString())) {
        return false;
    }
    return true;
}

6. 拼装在目标库执行的Sql语句

拼接sql的时候我们要根据字段的数据类型来确定是否带单引号,所以拼接sql的时候还需要注意数据类型的识别

   /**
     * dml 拼接处理
     */
    private Tuple2 <MysqlOperate, String> dmlHandle(Struct valueStruct, Envelope.Operation operation,
                                                    String database, List <Field> primaryKeys) {
        Struct sourceStrut = valueStruct.getStruct("source");
        //获取表名
        String table = sourceStrut.getString("table");
        // op=d 没有after
        List <Tuple3 <String, Object, SqlType>> afterList = null;
        // op=r 没有before
        List <Tuple3 <String, Object, SqlType>> beforeList = null;
        Struct afterStruct = (Struct) valueStruct.get("after");
        if (afterStruct != null) {
            afterList = getChangeDataMap(afterStruct);
        }
        Struct beforeStruct = (Struct) valueStruct.get("before");
        if (beforeStruct != null) {
            beforeList = getChangeDataMap(beforeStruct);
        }
        MysqlOperate type = MysqlOperate.UNDEFINE;
        String sql = "";
        switch (operation) {
            case READ:
            case CREATE:
                type = MysqlOperate.INSERT;
                sql = insertSql(database, table, afterList);
                break;
            case DELETE:
                type = MysqlOperate.DELETE;
                sql = deleteSql(database, table, beforeList, primaryKeys);
                break;
            case UPDATE:
                type = MysqlOperate.UPDATE;
                sql = updateSql2(database, table, afterList, beforeList, primaryKeys);
                break;
            default:
                log.error("非法操作类型");
        }
        return new Tuple2(type, sql);
    }
    

sql具体拼接实现,以及数据字段类型转换

public class DebesziumMakingSql {

    public final static String SYMBOL_POINT = ".";
    public final static String SYMBOL_COMMA = ",";
    public final static String SYMBOL_EQUAL = "=";
    public final static String SYMBOL_BACK_SINGLE_QUOTE = "`";
    public final static String SYMBOL_SINGLE_QUOTE = "'";
    public final static String JOINER_VALUES = " VALUES ";
    public final static String JOINER_AND = " AND ";

    /**
     * 拼接insert sql
     * @param db 数据库
     * @param table 表名
     * @param list 字段-值-类型
     * @return
     */
    public static String insertSql(String db, String table, List <Tuple3 <String, Object, SqlType>> list) {
        // 第1段 INSERT INTO `xsy_flowreplay`.`tb_env` (
        StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, db).append(SYMBOL_POINT);
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, table);
        // 第2段 (`id`,`name`)
        StringBuilder fieldBuilder = new StringBuilder("(");
        // 第3段 (1,"name")
        StringBuilder valueBuilder = new StringBuilder("(");
        list.forEach(tuple -> {
            // 组装表字段
            backSingleQuoteCocoon(fieldBuilder,tuple.f0).append(SYMBOL_COMMA);
            // 组装值
            spliceValue(valueBuilder,tuple,SYMBOL_COMMA);
        });
        // 去除最后一个,
        String fieldStr = fieldBuilder.toString().trim().substring(0,fieldBuilder.toString().lastIndexOf(SYMBOL_COMMA));
        // 去除最后一个,
        String valueStr = valueBuilder.toString().trim().substring(0,valueBuilder.toString().lastIndexOf(SYMBOL_COMMA));
        sqlBuilder.append(fieldStr).append(")").append(JOINER_VALUES).append(valueStr).append(");");
        return sqlBuilder.toString();
    }

    /**
     * 要保证setList和whereList一样大
     * @param db
     * @param table
     * @param setList after
     * @param whereList before
     * @return
     */
    public static String updateSql(String db, String table, List <Tuple3 <String, Object, SqlType>> setList
            , List <Tuple3 <String, Object, SqlType>> whereList, List <Field> primaryKeys) {
        // 差异字段才是需要set的
        setList.removeAll(whereList);
        // 第1段 UPDATE `xsy_flowreplay`.`aa` SET `id` = 502, `name` = 'Shen Ziyi111' WHERE `id` = 502 AND `name` = 'Shen Ziyi'
        StringBuilder sqlBuilder = new StringBuilder("UPDATE ");
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, db).append(SYMBOL_POINT);
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, table);
        sqlBuilder.append(" SET ");
        // 第2段 set 字段
        StringBuilder setBuilder = new StringBuilder();
        setList.forEach(tuple -> {
            spliceData(setBuilder, tuple, SYMBOL_COMMA);
        });
        // 第3段 where 条件
        StringBuilder whereBuilder = new StringBuilder();
        whereList.forEach(tuple -> {
            for (Field primaryKey : primaryKeys) {
                if (tuple.f0.equals(primaryKey.name())) {
                    spliceData(whereBuilder, tuple, JOINER_AND);
                }
            }
        });
        // 去除最后一个,
        String setStr = setBuilder.toString().trim().substring(0, setBuilder.toString().lastIndexOf(SYMBOL_COMMA));
        // 去除最后一个AND
        String whereStr = whereBuilder.toString().substring(0, whereBuilder.toString().lastIndexOf(JOINER_AND));
        sqlBuilder.append(setStr).append(" WHERE ").append(whereStr).append(";");
        return sqlBuilder.toString();
    }

    /**
     * update转换为delete和insert
     * @param db
     * @param table
     * @param afterList after
     * @param beforeList before
     * @return
     */
    public static String updateSql2(String db, String table, List <Tuple3 <String, Object, SqlType>> afterList
            , List <Tuple3 <String, Object, SqlType>> beforeList, List <Field> primaryKeys) {
        // before转换delete,after转换insert
        String deleteSql = deleteSql(db, table, beforeList, primaryKeys);
        String insertSql = insertSql(db, table, afterList);
        return deleteSql + insertSql;
    }
    /**
     *
     * @param db
     * @param table
     * @param whereList before
     * @return
     */
    public static String deleteSql(String db, String table, List <Tuple3 <String, Object, SqlType>> whereList, List <Field> primaryKeys){
        // DELETE  from `xsy_flowreplay`.`gen_table_copy1` WHERE `table_name` = 'rrerttr' and `table_comment` = '06g1tteVdZe3Z';
        StringBuilder sqlBuilder = new StringBuilder("DELETE FROM ");
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, db).append(SYMBOL_POINT);
        sqlBuilder = backSingleQuoteCocoon(sqlBuilder, table).append(" WHERE ");

        StringBuilder whereBuilder = new StringBuilder();
        whereList.forEach(tuple -> {
            for (Field primaryKey : primaryKeys) {
                if (tuple.f0.equals(primaryKey.name())) {
                    spliceData(whereBuilder, tuple, JOINER_AND);
                }
            }
        });
        // 去除最后一个AND
        String whereStr = whereBuilder.toString().substring(0, whereBuilder.toString().lastIndexOf(JOINER_AND));
        sqlBuilder.append(whereStr).append(";");
        return sqlBuilder.toString();
    }
    /**
     * 组装数据
     * @param builder
     * @param tuple
     * @param symbol
     */
    public static void spliceData(StringBuilder builder, Tuple3 <String, Object, SqlType> tuple, String symbol) {
        backSingleQuoteCocoon(builder,tuple.f0).append(SYMBOL_EQUAL);
        spliceValue(builder, tuple, symbol);
    }

    /**
     * 组装value数据
     * @param builder
     * @param tuple
     * @param symbol
     */
    public static void spliceValue(StringBuilder builder, Tuple3 <String, Object, SqlType> tuple, String symbol) {
        switch (tuple.f2) {
            case MYSQL_INT:
                builder.append(tuple.f1).append(symbol);
                break;
            case MYSQL_DATATIME:
                String dateStr = DateUtils.timeStamp2Date((Long) tuple.f1, DateUtils.YYYY_MM_DD_HH_MM_SS);
                singleQuoteCocoon(builder,dateStr).append(symbol);
                break;
            case MYSQL_TIMESTAMP:
                String timestampStr = DateUtils.utc2Date(String.valueOf(tuple.f1), DateUtils.YYYY_MM_DD_HH_MM_SS);
                singleQuoteCocoon(builder,timestampStr).append(symbol);
                break;
            case MYSQL_VARCHAR:
                singleQuoteCocoon(builder,tuple.f1).append(symbol);
                break;
            default:
        }
    }

    /**
     * 反单引号包裹 `id`
     * @param str
     * @return
     */
    public static StringBuilder backSingleQuoteCocoon(StringBuilder sb, Object str) {
        StringBuilder append = sb.append(SYMBOL_BACK_SINGLE_QUOTE).append(str).append(SYMBOL_BACK_SINGLE_QUOTE);
        return append;
    }

    /**
     * 单引号包裹 'lkn'
     * @param obj
     * @return
     */
    public static StringBuilder singleQuoteCocoon(StringBuilder sb, Object obj) {
        String value = CovertUtils.coverSqlSymbol(String.valueOf(obj));
        sb.append(SYMBOL_SINGLE_QUOTE).append(value).append(SYMBOL_SINGLE_QUOTE);
        return sb;
    }

    /**
     * 解析debezium的before
     * @param struct
     * @return
     */
    public static List <Tuple3 <String, Object, SqlType>> getChangeDataMap(Struct struct) {
        // 将变更的行封装为Map
        List <Tuple3 <String, Object, SqlType>> collect = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null)
                .map(fieldName -> {
                    SqlType sqlType = transMysqlType(struct, fieldName);
                    Tuple3 <String, Object, SqlType> of1 = Tuple3.of(fieldName, struct.get(fieldName), sqlType);
                    return of1;
                })
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(collect)) {
            return null;
        }
        return collect;
    }

    /**
     * debezium 字段类型转换,用于拼接sql
     * @param struct
     * @param fieldName
     * @return
     */
    private static SqlType transMysqlType(Struct struct, String fieldName) {
        Schema.Type type = struct.schema().field(fieldName).schema().type();
        String fileTypeDebe = struct.schema().field(fieldName).schema().name();
        if (type == Schema.Type.STRING && StringUtils.isBlank(fileTypeDebe)) {
            return SqlType.MYSQL_VARCHAR;
        }
        if (type == Schema.Type.STRING && StringUtils.isNotBlank(fileTypeDebe) && fileTypeDebe.equals("io.debezium.time.ZonedTimestamp")) {
            return SqlType.MYSQL_TIMESTAMP;
        }
        if (StringUtils.isNotBlank(fileTypeDebe) && fileTypeDebe.contains("io.debezium.time")) {
            return SqlType.MYSQL_DATATIME;
        }
        if (type == Schema.Type.BYTES && StringUtils.isNotBlank(fileTypeDebe) && fileTypeDebe.equals("org.apache.kafka.connect.data.Decimal")) {
            return SqlType.MYSQL_INT;
        }
        // 标注number类型
        if (type == Schema.Type.INT8 || type == Schema.Type.INT16 || type == Schema.Type.INT32 || type == Schema.Type.INT64 ||
                type == Schema.Type.BOOLEAN || type == Schema.Type.FLOAT32 || type == Schema.Type.FLOAT64) {
            return SqlType.MYSQL_INT;
        }
        // 不兼容的类型暂时都按照int处理
        return SqlType.MYSQL_INT;
    }

}

根据市面上已有的数据库同步工具,和DBA的一些建议,去掉了update操作,转化成了DELETE+INSERT语句来替换

7. 定义数据库连接池

由于连接目标库执行sql语句,所以需要自定义连接池来提高效率,也保证了数据库连接不会中断

public class HikariMysqlSink extends RichSinkFunction <OperateSqlModel> {

    private HikariDataSource dataSource = null;
    // 定义数据库连接对象
    private Connection connection = null;
    // 定义PreparedStatement对象
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ParameterTool parameter = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String dbsDriver = parameter.getRequired("target-dbs-driver");
        String user = parameter.getRequired("target-dbs-user");
        String password = parameter.getRequired("target-dbs-password");

        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setPoolName("Hikari 连接池");
        hikariConfig.setDataSourceClassName("com.mysql.cj.jdbc.MysqlDataSource");
        hikariConfig.addDataSourceProperty("user", user);
        hikariConfig.addDataSourceProperty("password", password);
        hikariConfig.addDataSourceProperty("url", dbsDriver);
        hikariConfig.setMaximumPoolSize(15);
        hikariConfig.setMinimumIdle(3);
        hikariConfig.setKeepaliveTime(30000);

        this.dataSource = new HikariDataSource(hikariConfig);
        // 创建数据库连接
        this.connection = dataSource.getConnection();
    }

    @Override
    public void close() throws Exception {
        super.close();
        // 关闭资源
        if (this.dataSource != null) {
            this.dataSource.close();
        }
        if (this.ps != null) {
            this.ps.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Override
    public void invoke(OperateSqlModel operateSqlModel, Context context) throws Exception {
            try {
                // 准备PreparedStatement对象
                switch (operateSqlModel.getOp()) {
                    case INSERT:
                    case UPDATE:
                    case DELETE:
                        this.ps = connection.prepareStatement(operateSqlModel.getDml());
                        this.ps.executeUpdate();
                        break;
                    case DDL:
                        this.ps = connection.prepareStatement(operateSqlModel.getDdl());
                        this.ps.executeUpdate();
                        break;
                    case UNDEFINE:
                        log.error("暂未支持的类型 {}",operateSqlModel.toString());
                        break;
                    default:
                }

            }catch (Exception e){
                log.error("执行sql出错{} ,{}",e,operateSqlModel.toString());
            }finally {
                // 关闭资源
                if (this.ps != null) {
                    this.ps.close();
                }
                log.debug("完成");
            }
    }
}

8. 配置化

为了使工具的使用更加的便捷,我们把一些常用的参数配置提取到配置文件,可以随时根据用户的需求进行更改,目前版本的配置文件如下:

# 源数据库配置信息
dbs-ip=127.0.0.1
dbs-port=3306
dbs-username=flink_cdc
dbs-password=123456
dbs-name=auc_passport,xsy_twitter
dbs-table=auc_passport.*,xsy_twitter.*
# 目标库如果和源库不一致,此处可以增加后缀
db-name-suffix=
# 是否开启DDL同步
ddl-enable=true
# 需要同步的租户id
tenant-id=123,124,125
# 启动选项 INITIAL, LATEST_OFFSET
startup-options-key=INITIAL
startup-options-value=
# 同一个数据库实例上需要分开同步多个数据库时,此id不能一致
server-id=1001-1100

# 目标数据库配置信息
target-dbs-driver=jdbc:mysql://127.0。0.1:3307/auc_passport?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=true
target-dbs-user=flink_cdc
target-dbs-password=123456

# env
# savePoint远程存储
cp-dir=hdfs://devops/user/syncdata/flinkCDC
cp-user=HADOOP_USER_NAME
cp-password=flowreplay

#cp-parallelism=5
# JDBC读取源数据库并行度,需要考虑源库压力情况
cp-source-parallelism=2
# 写目标库并行度
cp-sink-parallelism=16

难点问题与参数调优

这一节把研发测试过程中遇到的一些难点问题和参数调优放在了一起是因为好多难点问题最终都归咎于参数优化问题,产品本身的一些缺陷也使得参数的优化尤为的重要,我在研发过程中印象比较深的几个问题是:

Q: 大宽表的同步使flink集群任务宕掉
A: 原因是cp(Check Point对当前的上下文进行快照的操作)过于频繁,失败较多,导致cp失败充实次数超出限制,最终使flink集群宕掉。
   解决方案是调整cp的间隔时间,使cp的间隔时间增大;cp的失败重试的容忍参数调大一点。
   
Q: 对于均匀分布PK(主键)和均匀分布PK,如何设置chunkSize。
A: chunkSize的默认大小为8192,fetchSize默认大小是1024。自增PK、时间戳PK数据均匀分布PK,UUID属于非均匀分布PK;全量数据同步使用的是基于查询的CDC方式,因为切片所用的query耗时很大,均匀分布PK的分片大小基本一致,非均匀分布PK的分片大小kennel数量差距比较大。所以对于非均匀分布PK应该调大chunkSize来降低分片的次数,从而减少分片query所造成的性能消耗。fetchSize(每次查询的行数)的大小设置可以根据flink集群内存和数据行大小来调整,内存大可以适当调高此值。

Q: 数据同步方式的选择
A: Flink CDC2.0之后只支持INITIAL和LATEST_OFFSET两种方式。LATEST_OFFSET:永远不要在第一次启动时对受监控的数据库表执行快照,只需从 binlog的末尾读取,这意味着只有连接器启动后的更改;INITIAL:首次启动时对被监控的数据库表进行初始快照,并继续读取最新的binlog。后续可能还会实现从制定bunlog文件的偏移量开始读取和从指定的时间戳开始读取。目前在我们的应用中大多数情况使用的是INITIAL方式。

Q: 全量快照和日志增量,该如何设置内存大小
A: 根据cdc 2.2版本来说,快照全量读比较job manage任务比较消耗内存,binlog增量读取task manage比较消耗内存;社区后期会优化改善此问题;当前的解决方案是全量快照读把job manage内存设置大点,binlog增量读把task manage内存设置大点。

Q: 字符集不兼容问题
A: Flink CDC默认使用UTF-8字符集,由于公司内部早期的大款表使用的是ucs2字符集,导致在同步此字符集的数据时会出现乱码情况,所以为了兼容ucs2字符集需要重新编译Flink CDC的源码,在字符集的转换处增加usc2

    com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask#readField
    if (!"CHAR".equals(actualColumn.typeName()) && !"VARCHAR".equals(actualColumn.typeName()) && !"TEXT".equals(actualColumn.typeName())) {
        return rs.getObject(fieldNo);
    } else {
        return "ucs2".equalsIgnoreCase(actualColumn.charsetName()) ? (new String(rs.getBytes(fieldNo), StandardCharsets.UTF_8)).getBytes(StandardCharsets.UTF_16) : rs.getBytes(fieldNo);
    }

Q: 并行度怎么设置
A: 由于并行度大小设置会对源数据库造成读IO压力,所以需要根据源数据库的配置来进行设置适当的并行度,在分别同步灰度和沙盒环境的数据库时,经过多次测试最终读取源数据库的并行度大小设置为2,即对源数据库造成的影响比较小;数据同步至目标数据库的并行度关系着同步数据的速率,当前设置的是18,入库的速度完全可以跟上从源数据库读取数据的速率。在不影响源数据库线上业务的情况下最大限度的提升了数据同步的速率。

Q: 断点续传
A: 在流量回放平台使用的数据库同步中经常会使用到断点续传功能,由于数据库的基础数据量大,首次同步要耗费20多个小时的时间进行数据同步,所以在首次同步过后,每次回放完成后会进行增量数据同步,增量同步的数据节点是上次任务停止时所存放的上下文信息,会根据这个信息在上一次同步的基础上进行增量同步,往复操作即可满足流量回放平台的数据同步。

性能表现

数据同步性能的表现与源数据库配置和目标数据库配置有极大的关系,当前测试有灰度和沙盒到P17两种环境的测试

灰度–>P17
同步速率为24G/h
沙盒–>P17
同步速率为50G/h
两种环境下的增量数据同步(实时同步)延迟在1分钟以内(大多数情况下是几秒)。
在使用此工具时还需要根据自己的实际情况来进行测试和参数优化。

写在最后

Flink CDC在众多CDC的技术方案中能够脱颖而出成为目前最流行的CDC技术框架,从上边的方案比对中可以看出其功能的强大完善。我们基于Flink CDC所实现的数据库同步工具仅仅是Flink CDC的一个方向上的应用,在最初研发阶段社区中同样以此为基础做数据库同步工具研发的同学还是挺多的,通过次工具的研发也让我正式接触了大数据,感觉还是蛮有意思的。Flink CDC底层封装了Debezium,比Debezium更好的是全量数据同步增加了无锁支持,数据分块划分多任务读取等都是很棒的设计。
后期如果时间允许的话,可以在现在的基础上开发出ui页面丰富其功能,把任务的开启和关闭都可以通过页面完成,当前任务的详情和任务进度也可以展示给使用者,希望可以把此工具推广到其他部门。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论