暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于MySQL=>ES的数据同步生产方案

Elasticsearch之家 2022-08-09
525

七天,我搞定了mysql、es的同步生产实践


1

简介

canal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等

因为canal同步数据具有高性能,无代码入侵的优点,所以生产环境中我们常用canal来实现数据同步

下面我们就通过canal将mysql数据同步到elasticsearch的配置步骤来做详细探讨

如果在安装过程中出现报错,可先到这里查一下是否有相同报错,将会为你节约大量排错时间:canal同步数据到es常见报错[1]

2

下载

2.1

下载canal

因为我们环境是mysql8.0,es7.13.0。而canal1.1.4默认不支持es7.x,所以我这里选择下载1.1.5版本 直接到 canal github地址[2]下载

在这里插入图片描述

下载

    # 服务端
    canal.deployer-1.1.5.tar.gz
    # 客户端
    canal.adapter-1.1.5.tar.gz
    # 管理端
    canal.admin-1.1.5.tar.gz

    2.1.1

    wget下载

    也可以直接在服务器中通过wget指令下载

      wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
      wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
      wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

      2.2

      下载jdk

      因为canal依赖于java环境,所以需要下载安装jdk,这里我选择的是jdk1.8

      3

      配置mysql以及创建es索引

      因为canal是基于bin log来实现的,所以要开启binlog,并且设置binlog模式为行模式。

      3.1

      开启bin log

      这里因为我是mac系统,所以以mac为例,其他系统类似 修改my.cnf

        sudo vim /etc/my.cnf

        如果你是mac系统,这里用管理员打开该文件输入密码如果出现密码正确也无法进入的话,执行下述指令,给文件赋权

          sudo  chmod 755 /etc/my.cnf

          修改内容

            log-bin=mysql-bin
            binlog_format=ROW

            重启mysql服务

              # 我这里是brew安装的mysql,且没有配置环境变量,所以需要到安装路径下执行重启指令
              cd /usr/local/Cellar/mysql/8.0.25_1
              ./bin/mysql.server restart

              登录mysql查看是否开启成功

                show variables like '%log_bin%';
                在这里插入图片描述


                3.2

                创建canal用户


                我们创建一个canal用户,专门用于canal同步数据库使用 登录mysql,执行

                  # 注意大小写
                  CREATE USER canal IDENTIFIED BY 'canaL@123';
                  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
                  FLUSH PRIVILEGES;

                  如果创建时出现报错:Your password does not satisfy the current policy requirements 这是因为密码太过简单导致,用上特殊字符、数字、字母将密码设置得复杂一些即可。

                  或者你也可以将密码等级设置为LOW

                    # 查看密码等级
                    SHOW VARIABLES LIKE 'validate_password%';
                    set global validate_password.policy=LOW;


                    3.3

                    创建es索引

                      PUT user
                      {
                      "mappings": {
                      "properties": {
                      "code": {
                      "type": "keyword"
                      },
                      "email": {
                      "type": "text",
                      "fields": {
                      "keyword": {
                      "type": "keyword"
                      }
                      }
                      },
                      "realName": {
                      "type": "text",
                      "analyzer": "ik_smart",
                      "fields": {
                      "keyword": {
                      "type": "keyword"
                      }
                      }
                      },
                      "roleId": {
                      "type": "text",
                      "fields": {
                      "keyword": {
                      "type": "keyword"
                      }
                      }
                      },
                      "postId": {
                      "type": "text",
                      "fields": {
                      "keyword": {
                      "type": "keyword"
                      }
                      }
                      },
                      "deptId": {
                      "type": "text",
                      "fields": {
                      "keyword": {
                      "type": "keyword"
                      }
                      }
                      }
                      }
                      },
                      "settings": {
                      "number_of_replicas": 0, # 我的是单节点,所以副本分片设置为0,根据你自己的集群
                      "number_of_shards": 1
                      }
                      }

                      4

                      安装

                      canal官方提供了一些配置指导,包括mysql同步mysql、mq、es的配置,要擅用官方指南[3]

                      4.1

                      安装服务端canal-deployer

                      1、新建文件夹,并将压缩包发送到文件夹内,或者直接使用上述的wget来下载到指定文件夹

                         mkdir canal
                        mkdir canal/deployer
                        # 顺便把另外两个文件夹也创建了
                        mkdir canal/adapter
                        mkdir canal/admin
                        # 上传压缩包,这里选择scp指令上传
                        scp canal.adapter-1.1.5.tar.gz root@172.16.188.2:/var/local/canal/adapter
                        scp canal.deployer-1.1.5.tar.gz root@172.16.188.2:/var/local/canal/deployer
                        scp canal.admin-1.1.5.tar.gz root@172.16.188.2:/var/local/canal/admin

                        2、解压

                          cd /var/local/canal/deployer
                          tar -zxvf canal.deployer-1.1.5.tar.gz

                          3、修改配置文件

                            vi conf/example/instance.properties

                            修改内容: 设置数据库地址,账号和密码设置为上述创建的专用账号canal

                              # 如果是mysql集群,则这里最好设置一个值,随便填,与my.conf中的server-id不同即可;不填也行,启动时会自动生成一个随机id,可能会与server-id相同,但概率极低
                              # canal.instance.mysql.slaveId=0
                              # enable gtid use true/false
                              canal.instance.gtidon=false


                              # position info
                              canal.instance.master.address=172.16.188.1:3306 # canal_manager数据库地址(改这里)
                              # 如果要配置全量同步的话需要配置这里的journal,position,timestamp,后续单独讲解全量同步
                              canal.instance.master.journal.name=
                              canal.instance.master.position=
                              canal.instance.master.timestamp=
                              canal.instance.master.gtid=


                              # rds oss binlog
                              canal.instance.rds.accesskey=
                              canal.instance.rds.secretkey=
                              canal.instance.rds.instanceId=


                              # table meta tsdb info
                              canal.instance.tsdb.enable=true


                              # username/password
                              canal.instance.dbUsername=canal # 数据库账号,上述创建的canal账号(改这里)
                              canal.instance.dbPassword=canaL@123 # 数据库密码(改这里)
                              canal.instance.connectionCharset = UTF-8
                              # enable druid Decrypt database password
                              canal.instance.enableDruid=false
                              #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==


                              # table regex
                              canal.instance.filter.regex=.*\\..*
                              # table black regex
                              canal.instance.filter.black.regex=


                              # mq config
                              canal.mq.topic=example
                              canal.mq.partition=0

                              4、将mysql驱动器替换成8.0版本。因为我的数据库是mysql8.0版本。如果是mysql5.7可跳过这步 (1)将lib中的驱动器替换成mysql-connector-java-8.0.22.jar

                              (2)修改驱动器权限

                                chmod 777 lib/mysql-connector-java-8.0.22.jar
                                chmod +st lib/mysql-connector-java-8.0.22.jar

                                (3)修改后权限如图所示

                                  ll lib
                                  # 删除原来的5.x 版本的驱动器
                                  rm -rf lib/mysql-connecter-java-5.1.48.jar
                                  在这里插入图片描述

                                  5、启动服务


                                    ./bin/startup.sh

                                    6、deployer是后台启动的,通过查看日志检查是否正常启动

                                      cat logs/canal/canal.log

                                      发现如下日志即正常启动

                                      在这里插入图片描述


                                      4.1.1

                                      安装服务端canal-deployer


                                      1、在conf/example/instance.properties中修改

                                        # 全量同步
                                        canal.instance.master.journal.name=mysql-bin.000001
                                        canal.instance.master.position=0
                                        #2019-01-01 00:00:00 上一次更新的时间
                                        canal.instance.master.timestamp=1546272000000

                                        2、如果之前同步过,想要重新做全量同步,那么需要删除con/example/meta.dat文件,这个文件会记录上次同步的时间和binlog位置


                                        4.2

                                        安装客户端canal-adapter


                                        1、解压

                                          cd /var/local/canal/adapter
                                          tar -zxvf canal.adapter-1.1.5.tar.gz

                                          2、修改配置文件

                                            vim conf/application.yml

                                            修改内容

                                              server:
                                              port: 8081
                                              spring:
                                              jackson:
                                              date-format: yyyy-MM-dd HH:mm:ss
                                              time-zone: GMT+8
                                              default-property-inclusion: non_null


                                              canal.conf:
                                              mode: tcp #tcp kafka rocketMQ rabbitMQ
                                              flatMessage: true
                                              zookeeperHosts:
                                              syncBatchSize: 1000
                                              retries: 0
                                              timeout:
                                              accessKey:
                                              secretKey:
                                              consumerProperties:
                                              # canal tcp consumer
                                              canal.tcp.server.host: 127.0.0.1:11111
                                              canal.tcp.zookeeper.hosts:
                                              canal.tcp.batch.size: 500
                                              canal.tcp.username:
                                              canal.tcp.password:
                                              # kafka consumer
                                              kafka.bootstrap.servers: 127.0.0.1:9092
                                              kafka.enable.auto.commit: false
                                              kafka.auto.commit.interval.ms: 1000
                                              kafka.auto.offset.reset: latest
                                              kafka.request.timeout.ms: 40000
                                              kafka.session.timeout.ms: 30000
                                              kafka.isolation.level: read_committed
                                              kafka.max.poll.records: 1000
                                              # rocketMQ consumer
                                              rocketmq.namespace:
                                              rocketmq.namesrv.addr: 127.0.0.1:9876
                                              rocketmq.batch.size: 1000
                                              rocketmq.enable.message.trace: false
                                              rocketmq.customized.trace.topic:
                                              rocketmq.access.channel:
                                              rocketmq.subscribe.filter:
                                              # rabbitMQ consumer
                                              rabbitmq.host:
                                              rabbitmq.virtual.host:
                                              rabbitmq.username:
                                              rabbitmq.password:
                                              rabbitmq.resource.ownerId:
                                              srcDataSources:
                                              defaultDS:
                                              url: jdbc:mysql://172.16.188.1:3306/bladex?useUnicode=true
                                              #driverClassName: com.mysql.cj.jdbc.Driver
                                              username: root
                                              password: 123456
                                              canalAdapters:
                                              - instance: example # canal instance Name or mq topic name
                                              groups:
                                              - groupId: g1
                                              outerAdapters:
                                              - name: logger
                                              -
                                              key: esKey
                                              name: es7 # es6 or es7
                                              hosts: http://172.16.188.7:9200 # 集群地址,逗号隔开. 127.0.0.1:9200 for rest mode or 127.0.0.1:9300 for transport mode
                                              properties:
                                              mode: rest # rest or transport
                                              # security.auth: test:123456 # only used for rest mode
                                              cluster.name: cluster1

                                              3、修改es配置文件 查看conf/es7
                                              文件夹会发现里面默认有3个yml文件

                                                ll conf/es7

                                                如果需要配置mysql到es的同步,那么就需要在es路径下配置字段的映射,adapter默认会加载es路径下的所有yml文件。一个配置文件表示一张表的mapping,我们删除默认的yml,创建一个user.yml用于user表的同步

                                                  vim conf/es/user.yml

                                                  内容

                                                    dataSourceKey: defaultDS # 这里的key与上述application.yml中配置的数据源保持一致
                                                    outerAdapterKey: esKey # 与上述application.yml中配置的outerAdapters.key一直
                                                    destination: example # 默认为example,与application.yml中配置的instance保持一致
                                                    groupId:
                                                    esMapping:
                                                    _index: user
                                                    _type: _doc
                                                    _id: id
                                                    sql: "select t.id as id, t.code, t.email,t.real_name as realName,t.role_id as roleId,t.post_id as postId,t.dept_id as deptId from blade_user t"
                                                    etlCondition: "where t.update_time>='{0}'"
                                                    commitBatch: 3000

                                                    4、解决druid 包冲突,我下载的1.1.5版本针对es7.x连接有一个druid冲突报错,所以需要下载源码并且重新编译 或者你可以直接使用我编译好的:下载地址[4]

                                                    未来不知道会不会修复这个问题,你可以先启动看看是否报如下错误

                                                      com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

                                                      (1)修改client-adapter/escore/pom.xml

                                                        <dependency>
                                                        <groupId>com.alibaba</groupId>
                                                        <artifactId>druid</artifactId>
                                                        <!--add by whx 20220112-->
                                                        <scope>provided</scope>
                                                        </dependency>

                                                        (2)重新打包

                                                        在这里插入图片描述

                                                        (3)将client-adapter/es7x/target/client-adapter.es7x-1.1.5-jar-with-dependencies.jar上传到服务器,替换adataper/plugin下的同名jar文件

                                                        在这里插入图片描述


                                                          scp client-adapter.es7x-1.1.5-jar-with-dependencies.jar root@172.16.188.2:/var/local/canal/adapter/plugin

                                                          (4)给该文件赋权

                                                            chmod 777 /var/local/canal/adapter/plugin/client-adapter.es7x-1.1.5-jar-with-dependencies.jar

                                                            5、启动服务

                                                              ./bin/startup.sh

                                                              同理通过查看日志来看是否启动成功

                                                                cat logs/adapter/adapter.log

                                                                6、启动后,修改数据库数据,查看日志会发现已经有输出了(这里将部分敏感数据处理了)

                                                                  cat logs/adapter/adapter.log
                                                                  在这里插入图片描述

                                                                  7、在kibana查询,数据也已经同步到es了

                                                                  在这里插入图片描述


                                                                  4.2.1

                                                                  binlog未开启前的历史数据如何同步?


                                                                  因为canal是基于binlog实现全量同步的,那么未开启binlog之前的历史数据就无法被同步,我们可以通过logstash、业务代码、datax或者其他同步工具来实现真正的全量同步。但是只使用canal也不是不可以实现完全的全量同步

                                                                  方案就是将数据库中的数据导出,然后再重新导入一遍,这样就可以生成binlog。但是要注意的是如果是生产环境,需要注意控制导出量,谨防导出卡死,最好选择在凌晨,用户使用较少的时间段导出、导入

                                                                  当然也可以参考下方链接实现全量同步

                                                                  快来和小伙伴们一起学习更多干货


                                                                  文章转载自Elasticsearch之家,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                  评论