由于项目需求,用Flink生成完实时数据之后,需要立即给前台做展示,且日志项目实时数据数量较大,因此采用了Redis作为数据库,用来存储最近一小段时间的数据,供前台来进行实时展示。
Flink的Sink方法提供了一个redis的sink包(flink-connector-redis_2.11),但是使用过程中发现与业务不符合。主要是2个方面:
Flink提供的包入redis时不能设置失效时间,业务场景是只需要保存最新的实时数据一段时间,需要设置失效时间,让数据过期,从而提高资源的利用和性能。
业务使用的是Redis集群模式,Flink提供的Jar包虽然支持集群但是没有设置密码的地方,业务的Redis集群都有设置密码的强制要求。
对Flink提供RedisSink包进行改造,具体的是:1.增加可以设置失效时间的方法;2.集群模式提供Redis的验证方法。
1、增加缺失的方法
由于原来的RedisMapper只提供了获取入库方式、获取key以及获取Value,三个接口,没有提供获取失效时间的接口。

所以首先重新定义RedisMapper接口类,增加一个获取失效时间的接口。

其次在RedisCommand中增加一个带失效时间的命令
SETEX(BasicRedisDataType.STRING)接口既然以及改好了,就要去重新定义它的实现了。

接着在RedisSink的invoke方法中提供获取失效时间的代码,以及SETNX命令的实现。

由于原来的RedisCommandsContainer接口中不包含失效时间的方法,所以需要新增一个含失效时间的接口。

在具体的RedisClusterContainer集群实现该方法。

这样第一个问题就解决了,只要将设置为RedisCommand.SETEX就可以进行带失效时间的Sink方法了。
2、增加缺失的参数
关于增加设置密码的方式,Flink提供的包中JedisClusterConfig是缺失了集群密码设置的。因此需要添加一个密码选项,并提供set方法。

然后初始化的时候增加一个构造方法。

其实最终还是调用的原生redis连接包来创建了一个集群对象redis.clients.jedis.JedisCluster,在RedisCommandsContainerBuilder中添加带密码的redis集群创建方式。

通过这样改造就拥有了设置密码的方式,如果还少了别的参数同理可以通过这样的方式给添加上去。为了灵活改造我们实际直接将整个jar拉取了下来,重新定义了一个属于自己的Sink定制包,最终使用如下:


简要的给出类图

本文主要通过结合项目的实际使用场景,在flink与redis的sink包(flink-connector-redis_2.11)不能满足要求的情况下,在原有包的基础上根据实际使用情况,对该包进行了一系列的改造,主要包括提供设置密码的redis集群初始化以及在sink过程中可以指定redis键值的失效时长,最终通过改造的flink-connector-redis在项目flink任务中使用良好。




