前言 安装 K8S 安装 Flink 安装 Dinky 整库同步 附录
GitHub 地址 
一、前言
二、安装 K8S
如果是本地测试的话,可以起 minikube
如果是生产的话,可以使用 Rancher RKE2 配合 Rancher 使用,也可以用别的商用 K8S 方案(比如 阿里云 ACK,腾讯云 TKE 等)
三、安装 Flink
安装 Flink Operator 到 K8S
本文假设你已经会一些最基本的 k8s 知识,比如
k8s 集群配置了加速器(flink,dlink,doris,hadoop相关镜像没一个小的,不用加速器,得慢死)
kubectl 的使用,
~/.kube/config
,如何安装
helm
,
如果不会,请自行搜索相关知识。
# 安装 cert-manager 必选kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml# 可以将 flink-kubernetes-operator-1.1.0 换成别的版本,具体以 https://downloads.apache.org/flink/ 列出为准helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/# 安装 flink-kubernetes-operator 到 k8s 集群 (--namespace 可以缩写 -n,不写默认装到 default 集群,如果命名空间不存在,可以加上 --create-namespace ,在安装时创建命名空间)helm install flink-kubernetes-operator --create-namespace --namespace flink flink-operator-repo/flink-kubernetes-operator --set image.repository=apache/flink-kubernetes-operator
参考 flink-kubernetes-operator quick-start
安装 Flink Session 集群到 K8S
apiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata:name: flink-sessionspec:# 官方镜像少 jar ,我自己打的镜像,只用于演示本文,实际生产请自行构建镜像# Flink CDC 目前只支持 flink 1.14.* ,暂不支持 1.15.*image: anjia0532/flink:1.14.5-scala_2.12-java8-5# Flink 版本改成 1.14.*flinkVersion: v1_14flinkConfiguration:taskmanager.numberOfTaskSlots: "2"serviceAccount: flinkjobManager:resource:memory: "2048m"cpu: 1taskManager:resource:memory: "2048m"cpu: 1
Flink CDC 目前只支持 flink 1.14.* ,暂不支持 1.15.*
参考 flink-cdc-connectors 支持的 Flink 版本
# 安装 到 flink 命名空间kubectl -n flink apply -f flink-session-only.yaml
会自动创建 Flink Session Deployment(部署) 和 对应的 Service (服务发现 )

参考 flink-kubernetes-operator examples
四、安装 Dinky 到 K8S
---apiVersion: apps/v1kind: Deploymentmetadata:labels:app: flink-dlinkname: dlinkspec:selector:matchLabels:app: flink-dlinktemplate:metadata:labels:app: flink-dlinkspec:containers:- image: anjia0532/dlink:v0.6.6-1name: dlinkvolumeMounts:- mountPath: opt/dlink/config/application.ymlname: admin-configsubPath: application.ymlvolumes:- configMap:name: dlink-configname: admin-config---apiVersion: v1kind: ConfigMapmetadata:name: dlink-configdata:application.yml: |-spring:datasource:url: jdbc:mysql://mysql-headless.mysql:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: dlinkpassword: dlinkdriver-class-name: com.mysql.cj.jdbc.Driverapplication:name: dlink# flyway:# enabled: false# clean-disabled: true## baseline-on-migrate: true# table: dlink_schema_history# Redis配置#sa-token如需依赖redis,请打开redis配置和pom.xml、dlink-admin/pom.xml中依赖# redis:# host: localhost# port: 6379# password:# database: 10# jedis:# pool:# # 连接池最大连接数(使用负值表示没有限制)# max-active: 50# # 连接池最大阻塞等待时间(使用负值表示没有限制)# max-wait: 3000# # 连接池中的最大空闲连接数# max-idle: 20# # 连接池中的最小空闲连接数# min-idle: 5# # 连接超时时间(毫秒)# timeout: 5000server:port: 8888mybatis-plus:mapper-locations: classpath:/mapper/*Mapper.xml#实体扫描,多个package用逗号或者分号分隔typeAliasesPackage: com.dlink.modelglobal-config:db-config:id-type: autoconfiguration:##### mybatis-plus打印完整sql(只适用于开发环境)# log-impl: org.apache.ibatis.logging.stdout.StdOutImpllog-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl# Sa-Token 配置sa-token:# token名称 (同时也是cookie名称)token-name: satoken# token有效期,单位s 默认10小时, -1代表永不过期timeout: 36000# token临时有效期 (指定时间内无操作就视为token过期) 单位: 秒activity-timeout: -1# 是否允许同一账号并发登录 (为true时允许一起登录, 为false时新登录挤掉旧登录)is-concurrent: false# 在多人登录同一账号时,是否共用一个token (为true时所有登录共用一个token, 为false时每次登录新建一个token)is-share: true# token风格token-style: uuid# 是否输出操作日志is-log: false---apiVersion: v1kind: Servicemetadata:name: flink-dlinkspec:ipFamilies:- IPv4ipFamilyPolicy: SingleStackports:- name: httpport: 8888protocol: TCPtargetPort: 8888selector:app: dlinktype: ClusterIP
注意修改 ConfigMap 里的 dlink 链接的 MySQL 的地址,用户名,密码,以及执行github.com/DataLinkDC/… 里的 dlink.sql (第一次执行) 和 dlinkmysqlcatalog.sql(第一次执行),如果是已经存在了,只是要升级,执行 dlink_history.sql
kubectl -n flink apply -f dlink.yaml
可以使用 Idea 里的 Kubernates, Nocalhost, 或者 VS Code 里的 Kubernates, Nocalhost 或者命令行程序 k9s 或者kubectl 把 dlink 和 flink job UI 的端口转出来。
五、整库同步
EXECUTE CDCSOURCE cdc_mysql1 WITH ('connector' = 'mysql-cdc','hostname' = 'mysql-headless.mysql','port' = '3306','username' = 'dlink','password' = 'dlink','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'dlink\..*','sink.url' = 'jdbc:mysql://mysql-headless.mysql:3306/cdc-test?characterEncoding=utf-8&useSSL=false','sink.username' = 'dlink','sink.password' = 'dlink','sink.connector' = 'jdbc','sink.sink.db' = 'cdc-test','sink.table.prefix' = '','sink.table.lower' = 'true','sink.table-name' = '${tableName}','sink.driver' = 'com.mysql.jdbc.Driver','sink.sink.buffer-flush.interval' = '2s','sink.sink.buffer-flush.max-rows' = '100','sink.sink.max-retries' = '5')
打开 Dlink Web 添加 K8S Session 集群,添加 作业,复制 CDCSource SQL 保存,并执行、或者异步提交。

打开 Flink UI 看看执行情况。

六、附录
Dlink docker file
FROM flink:1.14.5-scala_2.12-java8 as builderFROM openjdk:8-jdkARG DLINK_VERSION="0.6.6"WORKDIR opt/dlinkADD https://github.com/DataLinkDC/dlink/releases/download/v${DLINK_VERSION}/dlink-release-${DLINK_VERSION}.tar.gz tmp/dlink.tar.gz#COPY ./dlink-release-${DLINK_VERSION}.tar.gz tmp/dlink.tar.gzRUN tar zxf tmp/dlink.tar.gz -C opt/dlink --strip-components=1 && mkdir -p opt/dlink/plugins/ && rm -rf tmp/*ADD https://maven.aliyun.com/repository/central/ru/yandex/clickhouse/clickhouse-jdbc/0.2.6/clickhouse-jdbc-0.2.6.jar opt/dlink/plugins/ADD https://maven.aliyun.com/repository/central/mysql/mysql-connector-java/8.0.22/mysql-connector-java-8.0.22.jar opt/dlink/plugins/ADD https://maven.aliyun.com/repository/central/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar opt/dlink/plugins/ADD https://maven.aliyun.com/repository/central/org/apache/flink/flink-connector-jdbc_2.12/1.14.5/flink-connector-jdbc_2.12-1.14.5.jar opt/dlink/lib/#ADD ./flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar opt/dlink/plugins/COPY --from=builder opt/flink/lib/* /opt/dlink/plugins/RUN cp /opt/dlink/extends/dlink-client-1.14-0.6.6.jar /opt/dlink/lib/RUN cp /opt/dlink/extends/dlink-catalog-mysql-1.14-0.6.6.jar /opt/dlink/lib/RUN cp /opt/dlink/extends/dlink-connector-jdbc-1.14-0.6.6.jar /opt/dlink/lib/RUN rm -rf /opt/dlink/lib/dlink-client-1.13-0.6.6.jar && rm -rf /opt/dlink/lib/dlink-catalog-mysql-1.13-0.6.6.jar && rm -rf /opt/dlink/lib/dlink-connector-jdbc-1.13-0.6.6.jarCMD [ "/bin/sh", "-c", "java -Dloader.path=./lib,./plugins -Ddruid.mysql.usePingMethod=false -jar -Xms512M -Xmx2048M ./dlink-admin-*.jar" ]
docker build . -f Dockerfile-flink -t anjia0532/flink:1.14.5-scala_2.12-java8-1docker push anjia0532/flink:1.14.5-scala_2.12-java8-1
MySQL 表名查询 SQL
SELECT GROUP_CONCAT(CONCAT(table_schema,"\\.",table_name))from `information_schema`.`TABLES` WHERE table_schema='dlink';## 结果为 dlink\.dlink_alert_group,dlink\.dlink_alert_history,dlink\.dlink_alert_instance,dlink\.dlink_catalogue,dlink\.dlink_cluster,dlink\.dlink_cluster_configuration,dlink\.dlink_database,dlink\.dlink_flink_document,dlink\.dlink_history,dlink\.dlink_jar,dlink\.dlink_job_history,dlink\.dlink_job_instance,dlink\.dlink_savepoints,dlink\.dlink_schema_history,dlink\.dlink_sys_config,dlink\.dlink_task,dlink\.dlink_task_statement,dlink\.dlink_task_version,dlink\.dlink_user,dlink\.metadata_column,dlink\.metadata_database,dlink\.metadata_database_property,dlink\.metadata_function,dlink\.metadata_table,dlink\.metadata_table_propertySELECT GROUP_CONCAT( DISTINCT CONCAT(table_schema,"\\..*") ORDER BY table_schema )from `information_schema`.`TABLES`;## 结果为 canal_manager\..*,cdc-test\..*,datax_web\..*,dlink\..*,information_schema\..*,mysql\..*,performance_schema\..*,sys\..*
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉社区群(推荐):



扫描二维码获取
更多精彩
Dinky开源





