场景 组件准备 部署 FlinkCDC 实时写入 Redis Kafka 与 Redis 流维关联 总结
GitHub 地址 
一、场景
1. 通过 Flink CDC 实时同步 MySQL 数据库数据到 Redis;
2. 通过 FlinkSQL 将 Kafka 的流数据与 Redis 维度数据进行关联。
二、组件准备
| 组件 | 版本 |
| Flink | 1.14.4 |
| Flink-mysql-cdc | 2.2.1 |
| Redis | 6.2.4 |
| Mysql | 5.7+ |
| Dinky | 0.6.6 |
| commons-pool2 | 2.11.0 |
| jedis | 3.3.0 |
| flink-connector-redis | 1.0.11 |
| flink-sql-connector-kafka | 2.12-1.14.4 |
温馨提示
commons-pool2 和 jedis 包是 flink-connector-redis 的 jar 引用到了,故添加上,这两个maven仓库都能下载,或者自己编译源码。其中 jedis 的编译的版本比较旧了,有更新的 RC 包,安全起见,如部署生产环境,应自行编译 flink-connector-redis。
github地址: https://github.com/jeff-zou/flink-connector-redis 在此感谢 jeff-zou 大佬贡献的 connector!
三、部署

四、FlinkCDC实时写入Redis
源库准备
准备需要同步的数据,同步库 emp_1 下的 employees_1,employees_2。
create database emp_1;CREATE TABLE IF NOT EXISTS `employees_1` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");CREATE TABLE IF NOT EXISTS `employees_2` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into employees_2 VALUES ("20", "1987-01-23", "huang", "menji", "M", "2000-03-04");insert into employees_2 VALUES ("21", "1993-04-21", "lu", "benweiniubi", "M", "2022-05-06");
FlinkSQL
SET execution.checkpointing.interval = 10s;SET execution.checkpointing.tolerable-failed-checkpoints = 3;SET execution.checkpointing.timeout = 300s;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET pipeline.operator-chaining = false;CREATE TABLE employees_source (table_name STRING METADATA VIRTUAL,emp_no int NOT NULL,birth_date date,first_name STRING,last_name STRING,gender STRING,hire_date date,PRIMARY KEY (`emp_no`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = '000000','database-name' = 'emp_1','table-name' = 'employees_[0-9]+');create table sink_redis(hsetKey VARCHAR,hsetField VARCHAR,hsetValue VARCHAR)with ('connector'='redis','redis-mode'='single','host'='hadoop102','port' = '6379','maxTotal' = '100','maxIdle' = '5','minIdle' = '5','sink.parallelism' = '1','sink.max-retries' = '3','command'='hset');create view temp_view (hsetKey, hsetField, hsetValue) ASselecttable_name as hsetKey,CAST(emp_no as STRING) as hsetField,'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||'"' || 'first_name' || '":"' || first_name || '",' ||'"' || 'last_name' || '":"' || last_name || '",' ||'"' || 'gender' || '":"' || gender || '",' ||'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValuefrom employees_source;insert into sink_redis select hsetKey, hsetField, hsetValue from temp_view;
sql说明:
'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||'"' || 'first_name' || '":"' || first_name || '",' ||'"' || 'last_name' || '":"' || last_name || '",' ||'"' || 'gender' || '":"' || gender || '",' ||'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue
上面只是把数据库的字段和值, 拼接成json串,作为 redis hset 的 value 而已,作为一个案例展示,没有别的含义。因 flink 1.15 版本以上才内置 json 生成函数,所以这里不得不用 || 拼接 (滑稽>< )。redis connector 其实支持很多操作,支持设置更多配置, 更详细的用法还请自行翻阅github。

提交 perjob 任务

FlinkWebUI

上图可见,流任务已经成功被 Dinky 提交的远程集群了。
数据效果图



修改数据
将 last_name 对应的值 kunkun
改为 momoda

flink webui 数据流变化。

查看redis数据, 数据已经被修改。

新增数据
表增加一条数据。
insert into employees_1 VALUES ("12", "1996-06-16", "dili", "reba", "F", "2000-07-25");
redis 成功增加一条。

五、Kafka 与 Redis 流维关联
Kafka 待发送数据
{"company_id":"employees_1", "emp_id":"10", "log_id":"log_a_001"}
FlinkSQL
SET execution.checkpointing.interval = 10s;SET execution.checkpointing.tolerable-failed-checkpoints = 3;SET execution.checkpointing.timeout = 300s;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET pipeline.operator-chaining = false;-- SET restart-strategy = fixed-delay; --重启策略-- SET restart-strategy.fixed-delay.attempts = 5 ; --尝试次数-- SET restart-strategy.fixed-delay.delay = 30s; --固定延时时间create table kafka_source(company_id string,emp_id string,log_id string,event_time as procTime()) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'test','properties.partition-discovery.interval-millis' = '30000','format' = 'json','scan.startup.mode' = 'latest-offset');create table dim_redis(hsetKey VARCHAR,hsetField VARCHAR,hsetValue VARCHAR)with ('connector'='redis','redis-mode'='single','host'='hadoop102','port' = '6379','lookup.cache.max-rows' = '5000','lookup.cache.ttl' = '300','maxTotal' = '100','maxIdle' = '5','minIdle' = '5','sink.parallelism' = '1','sink.max-retries' = '3','command'='hget');create table sink_table(company_id varchar,emp_id varchar,log_id varchar,event_time timestamp,first_name varchar,last_name varchar,hire_date varchar) with ('connector' = 'print');create view temp_view asselectcompany_id as company_id,emp_id as emp_id,log_id as log_id,event_time as event_time,JSON_VALUE(d.hsetValue, '$.first_name') as first_name,JSON_VALUE(d.hsetValue, '$.last_name') as last_name,JSON_VALUE(d.hsetValue, '$.hire_date') as hire_datefromkafka_source as kleft joindim_redis for system_time as of k.event_time as donk.company_id = d.hsetKeyandk.emp_id = d.hsetField;insert into sink_table select* from temp_view;

FlinkWebUI

流维关联成功并输出控制台

注意事项
参数 pipeline.operator-chaining 是为了临时测试,观看数据流图,业务上不推荐设置为 false。
六、总结
优势
dinky 作为 FlinkSql 一站式开发平台,集开发、监控、资源管理等诸多功能, 使用非常方便;很大程度上解决了业务开发的痛点, 内置的 OpenApi 功能,更可以跟其他调度平台协同,完成实时、离线的一整套开发,所谓工具用得好,下班下得早!
不足之处
开发过程中自定义的 jar 依赖, 需要手动放置带到指定位置,但一般生产上,可能没有权限访问。希望增加功能, 可以有选项栏,点击添加、选择本地包之后,自动同步到指定位置;或者能添加 pom 依赖格式, 自动集成到作业里面。
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉直播社区群(推荐):



扫描二维码获取
更多精彩
Dinky开源





