暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

自定义canal客户端,实现mysql同步到redis[附带源码]

Elasticsearch之家 2022-09-29
1777

0. 引言

我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。

我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。

这就需要借助canal来实现mysql到redis的数据同步

1. canal简介

canal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等

canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin

canal的数据同步流程如下图所示



因为目前canal还不能直接通过配置就实现对redis的数据同步,因此我们需要自定义一下canal客户端,通过服务端将数据同步到客户端后,由客户端自定义操作同步到redis



2. 安装

2.1 安装jdk

canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。

canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError
,详细报错信息如下:

    java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:412) ~[na:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:397) ~[na:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:155) ~[na:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:116) ~[na:na]
    at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:63) ~[na:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:185) ~[client-adapter.launcher-1.1.6.jar:na]

    2.2 安装canal

    1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。

    这里因为我们要自定义客户端,所以只用下载服务端deployer即可

    官方下载地址[1]



    当然也可以通过wget指令直接下载到服务器

      wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

      详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章

      通过canal来实现mysql数据同步到elasticsearch[2]

      3. 实现同步

      3.1 mysql操作步骤

      1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog

      修改mysql配置文件

        vim /etc/my.cnf

        修改内容

          [mysqld]
          log-bin=mysql-bin # 开启 binlog
          binlog-format=ROW # 选择 ROW 模式

          2、源数据库创建一个canal账号,并且设置slave
          ,dump
          权限

            CREATE USER canal IDENTIFIED BY 'canal';  
            GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
            -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
            FLUSH PRIVILEGES;



            3、因为mysql8.0.3后身份检验方式为caching_sha2_password
            ,但canal使用的是mysql_native_password
            ,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

              ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
              select host,user,plugin from mysql.user ;

              4、创建一个canal_manager
              数据库,编码格式utf8mb4
              (如果未安装admin管理服务则不需要该数据库)

              该数据库用于远程统一配置管理

              导入脚本canal_manager.sql
              ,初始化数据库结构数据

              该脚本文件在canal.admin
              下的conf目录中





              3.2 服务端deployer操作

              1、查询源mysql服务器的binlog位置

                # 源mysql服务器中登陆mysql执行
                show binary logs;



                2、进入deployer安装目录

                  cd deployer

                  3、我们新建一个实例redis
                  专门用于本次演示

                    cd conf
                    # 复制example实例配置
                    cp -R example redis

                    4、修改实例redis配置文件instance.properties

                      cd redis
                      vim instance.properties

                      修改内容

                        # position info
                        # 源数据库地址及端口
                        canal.instance.master.address=192.168.244.17:3306
                        # 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
                        canal.instance.master.journal.name=binlog.000007
                        # 开始同步的binlog文件位置
                        canal.instance.master.position=0
                        # 开始同步时间点 时间戳形式
                        canal.instance.master.timestamp=1546272000000


                        # 数据库账号密码
                        canal.instance.dbUsername=canal
                        canal.instance.dbPassword=canal


                        # 配置不同步mysql库
                        canal.instance.filter.black.regex=mysql\..*

                        mysql数据同步起点说明:

                        canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

                        5、启动服务端

                          ./bin/start.sh

                          6、查看示例日志,无报错则说明启动成功

                            cat logs/redis/redis.log



                            针对服务端的详细配置项解释,可以参考官方文档:

                            配置项解释[3]



                            3.3 客户端client操作

                            1、创建springboot项目,引入依赖spring-data-redis
                            ,fastjson
                            ,lombok
                            ,canal.client

                                     <dependency>
                              <groupId>org.springframework.boot</groupId>
                              <artifactId>spring-boot-starter-data-redis</artifactId>
                              </dependency>


                              <dependency>
                              <groupId>com.alibaba</groupId>
                              <artifactId>fastjson</artifactId>
                              <version>1.2.72</version>
                              </dependency>


                              <dependency>
                              <groupId>org.projectlombok</groupId>
                              <artifactId>lombok</artifactId>
                              <optional>true</optional>
                              </dependency>


                              <dependency>
                              <groupId>top.javatool</groupId>
                              <artifactId>canal-spring-boot-starter</artifactId>
                              <version>1.2.1-RELEASE</version>
                              </dependency>

                              这里需要注意的是官方提供的canal-client依赖如下所示,而上述我们引入的是封装过的第三方包,更加易用。但该canal-spring-boot-starter
                              依赖包目前已经停止维护了,最新版对应的canal-client还是1.2.1-RELEASE
                              版本的。不过不影响我们使用,如果有需要可以下载源码二次开发。

                                 <dependency>
                                <groupId>com.alibaba.otter</groupId>
                                <artifactId>canal.client</artifactId>
                                <version>${canal.version}</version>
                                </dependency>

                                2、修改配置文件application.yml

                                  # 应用名称
                                  spring:
                                  application:
                                  name: canal_client_redis
                                  redis:
                                  host: 127.0.0.1
                                  password:


                                  # 应用服务 WEB 访问端口
                                  server:
                                  port: 8080


                                  # canal服务端地址
                                  canal:
                                  server: 192.168.244.22:11111
                                  # 实例名,与deployer中配置的保持统一
                                  destination: redis


                                  # 设置canal消息日志打印级别
                                  logging:
                                  level:
                                  top.javatool.canal.client: warn

                                  3、因为我们同步的是数据对象,需要进行json序列化,于是配置redis序列化方式为json,创建如下配置类

                                    /**
                                    * @author benjamin_5
                                    * @Description
                                    * @date 2022/9/25
                                    */
                                    @Configuration
                                    @AllArgsConstructor
                                    public class RedisConfig {


                                    private RedisConnectionFactory factory;


                                    /**
                                    * 设置json序列化
                                    * @return
                                    */
                                    @Bean
                                    public RedisTemplate<Object,Object> redisTemplate(){
                                    RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>();
                                    redisTemplate.setConnectionFactory(factory);
                                    // json序列化
                                    GenericFastJsonRedisSerializer serializer = new GenericFastJsonRedisSerializer();
                                    // key设置json序列化
                                    redisTemplate.setKeySerializer(serializer);
                                    // value设置json序列化
                                    redisTemplate.setValueSerializer(serializer);
                                    // hash结构key设置json序列化
                                    redisTemplate.setHashKeySerializer(serializer);
                                    // hash结构value设置json序列化
                                    redisTemplate.setHashValueSerializer(serializer);
                                    return redisTemplate;
                                    }


                                    }

                                    4、创建实体类,与同步的表格对应

                                    这里需要注意的是,因为mysql中的字段以下划线命名法,实体类中以驼峰命名法,直接同步的话会导致字段名不匹配,于是我们需要通过JPA
                                    注解来映射字段名

                                      /**
                                      * @author benjamin_5
                                      * @Description
                                      * @date 2022/9/25
                                      */
                                      @Data
                                      @Table(name = "bs_ecif")
                                      public class BsEcif implements Serializable {


                                      @Column(name="seq_no")
                                      private Long seqNo;


                                      @Column(name="customer")
                                      private String customer;


                                      @Column(name="id_type")
                                      private String idType;


                                      @Column(name="id_no")
                                      private String idNo;


                                      @Column(name="new_field")
                                      private String newField;


                                      @Override
                                      public String toString() {
                                      return "BsEcif{" +
                                      "seqNo=" + seqNo +
                                      ", customer='" + customer + '\'' +
                                      ", idType='" + idType + '\'' +
                                      ", idNo='" + idNo + '\'' +
                                      ", newField='" + newField + '\'' +
                                      '}';
                                      }
                                      }

                                      5、canal-spring-boot-starter
                                      包提供了EntryHandler
                                      类用于监控表数据更新,于是我们创建一个EntryHandler
                                      实现类,用于实现redis的增删改

                                        /**
                                        * @author benjamin_5
                                        * @Description
                                        * @date 2022/9/25
                                        */
                                        @CanalTable("bs_ecif")
                                        @Component
                                        @AllArgsConstructor
                                        @Slf4j
                                        public class BsEcifHandler implements EntryHandler<BsEcif> {


                                        private final RedisTemplate<Object,Object> redisTemplate;


                                        @Override
                                        public void insert(BsEcif bsEcif) {
                                        log.info("[新增]"+bsEcif.toString());
                                        redisTemplate.opsForValue().set("bs_ecif:"+bsEcif.getSeqNo(),bsEcif);
                                        }


                                        @Override
                                        public void update(BsEcif before, BsEcif after) {
                                        log.info("[更新]"+after.toString());
                                        redisTemplate.opsForValue().set("bs_ecif:"+after.getSeqNo(),after);
                                        }


                                        @Override
                                        public void delete(BsEcif bsEcif) {
                                        log.info("[删除]"+bsEcif.getSeqNo());
                                        redisTemplate.delete("bs_ecif:"+bsEcif.getSeqNo());
                                        }
                                        }

                                        最后提供两个官方示例给大家参考,官方示例采用的是canal-client
                                        包,未经过封装

                                        官方示例[4]

                                        官方示例2[5]

                                        3.4 测试

                                        1、启动项目

                                        2、数据库中添加一行数据



                                        3、查看日志打印



                                        4、连接redis,发现新增数据同步成功



                                        5、修改数据



                                        6、查看redis,同步成功



                                        7、删除数据



                                        8、查看redis,发现数据也同步删除了



                                        至此我们所有类型的操作都测试完成

                                        演示代码下载

                                        本次演示代码,可在如下地址下载,供大家参考:

                                        GIT项目地址[6]

                                        总结

                                        自此我们针对mysql同步到redis的演示就结束了,除了配置实现外,也带大家体验了自定义客户端的实现,以更加易用的第三方封装canal-client包来快速实现数据同步。想要加深理解,真正掌握还需要大家亲自动手操作试试。

                                        我正在申请认证,如果本文对你有帮助的话,希望点个赞支持一下

                                        Elastic  

                                        References

                                        [1]
                                         官方下载地址: https://github.com/alibaba/canal/releases
                                        [2]
                                         通过canal来实现mysql数据同步到elasticsearch: https://blog.csdn.net/qq_24950043/article/details/122421293
                                        [3]
                                         配置项解释: https://github.com/alibaba/canal/wiki/AdminGuide
                                        [4]
                                         官方示例: https://github.com/alibaba/canal/wiki/ClientExample
                                        [5]
                                         官方示例2: https://github.com/alibaba/canal/wiki/ClientAPI
                                        [6]
                                         GIT项目地址: https://gitee.com/wuhanxue/canal_client_redis


                                        文章转载自Elasticsearch之家,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                        评论