暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky on k8s 整库同步实践

Dinky开源 2022-08-18
300
摘要:本文介绍了安家老师带来的的 Dinky 在 K8S 上进行整库同步的实践分享。内容包括:
  1. 前言
  2. 安装 K8S
  3. 安装 Flink
  4. 安装 Dinky
  5. 整库同步
  6. 附录


Tips:历史传送门
Dinky 实践系列之 Flink Catalog 元数据管理
Dinky实践系列之FlinkCDC整库实时入仓入湖
Dinky FlinkCDC 整库入仓 StarRocks
Dinky 构建 Flink CDC 整库入仓入湖
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~


一、前言

    本文主要讲解如何在 K8S 集群跑 Dlink+Flink 通过 Flink CDC 进行整库同步。




二、安装 K8S

    如果是本地测试的话,可以起 minikube

    如果是生产的话,可以使用 Rancher RKE2 配合 Rancher 使用,也可以用别的商用 K8S 方案(比如 阿里云 ACK,腾讯云 TKE 等)




三、安装 Flink


安装 Flink Operator 到 K8S

    本文假设你已经会一些最基本的 k8s 知识,比如

  • k8s 集群配置了加速器(flink,dlink,doris,hadoop相关镜像没一个小的,不用加速器,得慢死)

  • kubectl 的使用,~/.kube/config
    ,

  • 如何安装 helm
    ,

如果不会,请自行搜索相关知识。

    # 安装 cert-manager 必选
    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml


    # 可以将 flink-kubernetes-operator-1.1.0 换成别的版本,具体以 https://downloads.apache.org/flink/ 列出为准
    helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/


    # 安装 flink-kubernetes-operator 到 k8s 集群 (--namespace 可以缩写 -n,不写默认装到 default 集群,如果命名空间不存在,可以加上 --create-namespace ,在安装时创建命名空间)
    helm install flink-kubernetes-operator --create-namespace --namespace flink flink-operator-repo/flink-kubernetes-operator --set image.repository=apache/flink-kubernetes-operator

        参考 flink-kubernetes-operator quick-start


    安装 Flink Session 集群到 K8S

      apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
      name: flink-session
      spec:
      # 官方镜像少 jar ,我自己打的镜像,只用于演示本文,实际生产请自行构建镜像
      # Flink CDC 目前只支持 flink 1.14.* ,暂不支持 1.15.*
      image: anjia0532/flink:1.14.5-scala_2.12-java8-5
      # Flink 版本改成 1.14.*
      flinkVersion: v1_14
      flinkConfiguration:
      taskmanager.numberOfTaskSlots: "2"
      serviceAccount: flink
      jobManager:
      resource:
      memory: "2048m"
      cpu: 1
      taskManager:
      resource:
      memory: "2048m"
      cpu: 1

          Flink CDC 目前只支持 flink 1.14.* ,暂不支持 1.15.*

          参考 flink-cdc-connectors 支持的 Flink 版本

        # 安装 到 flink 命名空间
        kubectl -n flink apply -f flink-session-only.yaml

            会自动创建 Flink Session Deployment(部署) 和 对应的 Service (服务发现 )

            参考 flink-kubernetes-operator examples




        四、安装 Dinky 到 K8S


          ---
          apiVersion: apps/v1
          kind: Deployment
          metadata:
          labels:
          app: flink-dlink
          name: dlink
          spec:
          selector:
          matchLabels:
          app: flink-dlink
          template:
          metadata:
          labels:
          app: flink-dlink
          spec:
          containers:
          - image: anjia0532/dlink:v0.6.6-1
          name: dlink
          volumeMounts:
          - mountPath: opt/dlink/config/application.yml
          name: admin-config
          subPath: application.yml
          volumes:
          - configMap:
          name: dlink-config
          name: admin-config


          ---
          apiVersion: v1
          kind: ConfigMap
          metadata:
          name: dlink-config
          data:
          application.yml: |-
          spring:
          datasource:
          url: jdbc:mysql://mysql-headless.mysql:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
          username: dlink
          password: dlink
          driver-class-name: com.mysql.cj.jdbc.Driver
          application:
          name: dlink
          # flyway:
          # enabled: false
          # clean-disabled: true
          ## baseline-on-migrate: true
          # table: dlink_schema_history
          # Redis配置
          #sa-token如需依赖redis,请打开redis配置和pom.xml、dlink-admin/pom.xml中依赖
          # redis:
          # host: localhost
          # port: 6379
          # password:
          # database: 10
          # jedis:
          # pool:
          # # 连接池最大连接数(使用负值表示没有限制)
          # max-active: 50
          # # 连接池最大阻塞等待时间(使用负值表示没有限制)
          # max-wait: 3000
          # # 连接池中的最大空闲连接数
          # max-idle: 20
          # # 连接池中的最小空闲连接数
          # min-idle: 5
          # # 连接超时时间(毫秒)
          # timeout: 5000
          server:
          port: 8888


          mybatis-plus:
          mapper-locations: classpath:/mapper/*Mapper.xml
          #实体扫描,多个package用逗号或者分号分隔
          typeAliasesPackage: com.dlink.model
          global-config:
          db-config:
          id-type: auto
          configuration:
          ##### mybatis-plus打印完整sql(只适用于开发环境)
          # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
          log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl


          # Sa-Token 配置
          sa-token:
          # token名称 (同时也是cookie名称)
          token-name: satoken
          # token有效期,单位s 默认10小时, -1代表永不过期
          timeout: 36000
          # token临时有效期 (指定时间内无操作就视为token过期) 单位: 秒
          activity-timeout: -1
          # 是否允许同一账号并发登录 (为true时允许一起登录, 为false时新登录挤掉旧登录)
          is-concurrent: false
          # 在多人登录同一账号时,是否共用一个token (为true时所有登录共用一个token, 为false时每次登录新建一个token)
          is-share: true
          # token风格
          token-style: uuid
          # 是否输出操作日志
          is-log: false
          ---
          apiVersion: v1
          kind: Service
          metadata:
          name: flink-dlink
          spec:
          ipFamilies:
          - IPv4
          ipFamilyPolicy: SingleStack
          ports:
          - name: http
          port: 8888
          protocol: TCP
          targetPort: 8888
          selector:
          app: dlink
          type: ClusterIP

              注意修改 ConfigMap 里的 dlink 链接的 MySQL 的地址,用户名,密码,以及执行github.com/DataLinkDC/… 里的  dlink.sql (第一次执行) 和 dlinkmysqlcatalog.sql(第一次执行),如果是已经存在了,只是要升级,执行  dlink_history.sql

            kubectl -n flink apply -f dlink.yaml

                可以使用 Idea 里的 Kubernates, Nocalhost, 或者 VS Code 里的 Kubernates, Nocalhost 或者命令行程序 k9s 或者kubectl 把 dlink 和 flink job UI 的端口转出来。




            五、整库同步

                需要确保 dlink 所在的 MySQL 的 my.conf 开了 binglog ,并且 格式为 ROW。演示 整库同步 dlink 的所有表同步到 cdc-test 中。在 dlink 的 MySQL 数据库中创建一个名为 cdc-test 的库。
              EXECUTE CDCSOURCE cdc_mysql1 WITH (
              'connector' = 'mysql-cdc',
              'hostname' = 'mysql-headless.mysql',
              'port' = '3306',
              'username' = 'dlink',
              'password' = 'dlink',
              'checkpoint' = '3000',
              'scan.startup.mode' = 'initial',
              'parallelism' = '1',
              'table-name' = 'dlink\..*',
              'sink.url' = 'jdbc:mysql://mysql-headless.mysql:3306/cdc-test?characterEncoding=utf-8&useSSL=false',
              'sink.username' = 'dlink',
              'sink.password' = 'dlink',
              'sink.connector' = 'jdbc',
              'sink.sink.db' = 'cdc-test',
              'sink.table.prefix' = '',
              'sink.table.lower' = 'true',
              'sink.table-name' = '${tableName}',
              'sink.driver' = 'com.mysql.jdbc.Driver',
              'sink.sink.buffer-flush.interval' = '2s',
              'sink.sink.buffer-flush.max-rows' = '100',
              'sink.sink.max-retries' = '5'
              )

                  打开 Dlink Web 添加 K8S Session 集群,添加 作业,复制 CDCSource SQL 保存,并执行、或者异步提交。

                  打开 Flink UI 看看执行情况。




              六、附录


              Dlink docker file

                FROM flink:1.14.5-scala_2.12-java8 as builder


                FROM openjdk:8-jdk


                ARG DLINK_VERSION="0.6.6"


                WORKDIR opt/dlink


                ADD https://github.com/DataLinkDC/dlink/releases/download/v${DLINK_VERSION}/dlink-release-${DLINK_VERSION}.tar.gz tmp/dlink.tar.gz


                #COPY ./dlink-release-${DLINK_VERSION}.tar.gz tmp/dlink.tar.gz


                RUN tar zxf tmp/dlink.tar.gz -C opt/dlink --strip-components=1 && mkdir -p opt/dlink/plugins/ && rm -rf tmp/*


                ADD https://maven.aliyun.com/repository/central/ru/yandex/clickhouse/clickhouse-jdbc/0.2.6/clickhouse-jdbc-0.2.6.jar opt/dlink/plugins/
                ADD https://maven.aliyun.com/repository/central/mysql/mysql-connector-java/8.0.22/mysql-connector-java-8.0.22.jar opt/dlink/plugins/
                ADD https://maven.aliyun.com/repository/central/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar opt/dlink/plugins/
                ADD https://maven.aliyun.com/repository/central/org/apache/flink/flink-connector-jdbc_2.12/1.14.5/flink-connector-jdbc_2.12-1.14.5.jar opt/dlink/lib/


                #ADD ./flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar opt/dlink/plugins/


                COPY --from=builder opt/flink/lib/* /opt/dlink/plugins/


                RUN cp /opt/dlink/extends/dlink-client-1.14-0.6.6.jar /opt/dlink/lib/
                RUN cp /opt/dlink/extends/dlink-catalog-mysql-1.14-0.6.6.jar /opt/dlink/lib/
                RUN cp /opt/dlink/extends/dlink-connector-jdbc-1.14-0.6.6.jar /opt/dlink/lib/




                RUN rm -rf /opt/dlink/lib/dlink-client-1.13-0.6.6.jar && rm -rf /opt/dlink/lib/dlink-catalog-mysql-1.13-0.6.6.jar && rm -rf /opt/dlink/lib/dlink-connector-jdbc-1.13-0.6.6.jar


                CMD [ "/bin/sh", "-c", "java -Dloader.path=./lib,./plugins -Ddruid.mysql.usePingMethod=false -jar -Xms512M -Xmx2048M ./dlink-admin-*.jar" ]

                  docker build . -f Dockerfile-flink -t anjia0532/flink:1.14.5-scala_2.12-java8-1
                  docker push anjia0532/flink:1.14.5-scala_2.12-java8-1

                  MySQL 表名查询 SQL

                    SELECT GROUP_CONCAT(CONCAT(table_schema,"\\.",table_name)) 
                    from `information_schema`.`TABLES` WHERE table_schema='dlink';


                    ## 结果为 dlink\.dlink_alert_group,dlink\.dlink_alert_history,dlink\.dlink_alert_instance,dlink\.dlink_catalogue,dlink\.dlink_cluster,dlink\.dlink_cluster_configuration,dlink\.dlink_database,dlink\.dlink_flink_document,dlink\.dlink_history,dlink\.dlink_jar,dlink\.dlink_job_history,dlink\.dlink_job_instance,dlink\.dlink_savepoints,dlink\.dlink_schema_history,dlink\.dlink_sys_config,dlink\.dlink_task,dlink\.dlink_task_statement,dlink\.dlink_task_version,dlink\.dlink_user,dlink\.metadata_column,dlink\.metadata_database,dlink\.metadata_database_property,dlink\.metadata_function,dlink\.metadata_table,dlink\.metadata_table_property




                    SELECT GROUP_CONCAT( DISTINCT CONCAT(table_schema,"\\..*") ORDER BY table_schema )
                    from `information_schema`.`TABLES`;


                    ## 结果为 canal_manager\..*,cdc-test\..*,datax_web\..*,dlink\..*,information_schema\..*,mysql\..*,performance_schema\..*,sys\..*




                    交流

                    欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

                    QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。

                    微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。

                    钉钉社区群(推荐):

                           公众号:Dinky开源



                    扫描二维码获取

                    更多精彩

                    Dinky开源




                    文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论