spring-boot-starter-data-redis
原生并不支持读写分离;因此需要我们去手动封装实现;实现思路:1.通过AOP来实现;2.封装工具类,我们在工具类里面实现;3.由使用者自己去处理。
这里我们通过再次封装redisTemplate类来实现,简化调用者的工作。
由于需要读写分离,因此我们需要自定义配置来实现。代码如下:
Redis自定义属性配置RedisReadWriteConfig
@Data@Component@ConfigurationProperties(prefix = "spring.redis")public class RedisReadWriteConfig implements Serializable {/*** 主机地址*/@Value("${spring.redis.host}")private String host;/*** 认证密码*/@Value("${spring.redis.password:#{null}}")private String password;/*** 端口号*/@Value("${spring.redis.port:6379}")private int port = 6379;/*** 数据库编号*/@Value("${spring.redis.database:0}")private int database;/*** 连接超时时间,单位毫秒*/@Value("${spring.redis.timeout:3000}")private long timeout;/*** 关闭超时时间,单位毫秒*/@Value("${spring.redis.lettuce.shutdown-timeout:200}")private long shutdownTimeout;/*** 连接池中的最小空闲连接*/@Value("${spring.redis.lettuce.pool.min-idle:1}")private int minIdle;/*** 连接池中的最大空闲连接*/@Value("${spring.redis.lettuce.pool.max-idle:6}")private int maxIdle = 6;/*** 连接池最大连接数(使用负值表示没有限制,不要配置过大,否则可能会影响redis的性能)*/@Value("${spring.redis.lettuce.pool.max-active:10}")private int maxActive = 10;/*** 连接池最大阻塞等待时间(使用负值表示没有限制),单位毫秒*/@Value("${spring.redis.lettuce.pool.max-wait:1000}")private long maxWait = 1000;/*** redis只读库配置*/private List<RedisReadConfig> redisReadConfigs;@Data@Validated@AllArgsConstructor@NoArgsConstructorstatic class RedisReadConfig {/*** 主机地址*/@NotEmptyprivate String host;/*** 认证密码*/private String password;/*** 端口号*/private int port = 6379;/*** 数据库编号*/private int database = 0;}/*** 只读实例配置** @return 返回所有数据读取的配置*/public List<RedisStandaloneConfiguration> buildReadRedisStandaloneConfiguration() {if (enableReadWriteModel()) {redisReadConfigs = redisReadConfigs.stream().distinct().collect(Collectors.toList());List<RedisStandaloneConfiguration> list = new ArrayList<>(redisReadConfigs.size());for(RedisReadConfig readConfig : redisReadConfigs){list.add(createRedisStandaloneConfiguration(readConfig));}return list;}return null;}/*** 写实例配置** @return 返回所有数据读取的配置*/public RedisStandaloneConfiguration buildWriteRedisStandaloneConfiguration() {RedisReadConfig redisServerConfig = new RedisReadConfig();redisServerConfig.setHost(this.host);redisServerConfig.setPort(this.port);redisServerConfig.setPassword(this.password);redisServerConfig.setDatabase(this.database);return createRedisStandaloneConfiguration(redisServerConfig);}/*** 是否启动读写分离模式** @return 启用返回true;否则false*/public boolean enableReadWriteModel(){return redisReadConfigs != null && !redisReadConfigs.isEmpty();}private RedisStandaloneConfiguration createRedisStandaloneConfiguration(RedisReadConfig redisServerConfig) {RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();// 连接地址redisStandaloneConfiguration.setHostName(redisServerConfig.getHost());// 认证密码redisStandaloneConfiguration.setPassword(redisServerConfig.getPassword());// 端口号,默认6379redisStandaloneConfiguration.setPort(redisServerConfig.getPort());// 数据库编号redisStandaloneConfiguration.setDatabase(redisServerConfig.getDatabase());return redisStandaloneConfiguration;}}
配置示例
server:port: 8088spring:redis:# 地址host: 192.168.56.102# 端口号port: 6379# 密码password: 123456# 超时时间,单位毫秒timeout: 3000# 数据库编号database: 0# 配置lettucelettuce:pool:# 连接池中的最小空闲连接min-idle: 1# 连接池中的最大空闲连接max-idle: 6# 连接池最大连接数(使用负值表示没有限制,不要配置过大,否则可能会影响redis的性能)max-active: 10# 连接池最大阻塞等待时间(使用负值表示没有限制);单位毫秒max-wait: 1000#关闭超时时间;单位毫秒shutdown-timeout: 200# redis只读库配置redis-read-configs:- host: 192.168.56.104port: 6379password: 123456- host: 192.168.56.105port: 6379password: 123456
自定义redis连接工厂ReadWriteLettuceConnectionFactory
由于我们有多个只读库,为了实现动态切换,我们自己是实现一个工厂,方便后面操作
@Slf4j@Componentpublic class ReadWriteLettuceConnectionFactory implements DisposableBean {private final LettuceConnectionFactory writeConnectionFactory;private final List<LettuceConnectionFactory> readConnectionFactories = new ArrayList<>();private final AtomicInteger pos = new AtomicInteger();private int max = -1;public ReadWriteLettuceConnectionFactory(RedisReadWriteConfig readWriteConfig) {this.writeConnectionFactory = createLettuceConnectionFactory(readWriteConfig, readWriteConfig.buildWriteRedisStandaloneConfiguration());Assert.notNull(writeConnectionFactory, "redis config can not null, if don't used please remove dependence redis jar.");if (readWriteConfig.enableReadWriteModel()) {List<RedisStandaloneConfiguration> list = readWriteConfig.buildReadRedisStandaloneConfiguration();if(null!=list){for(RedisStandaloneConfiguration rsc:list){LettuceConnectionFactory connectionFactory = createLettuceConnectionFactory(readWriteConfig, rsc);if(connectionFactory!=null){log.info("redis-read-datasource - load a datasource [{}:{}] success!", rsc.getHostName(), rsc.getPort());readConnectionFactories.add(connectionFactory);max++;}else {log.warn("redis-read-datasource - load a datasource [{}:{}] fail!", rsc.getHostName(), rsc.getPort());}}}else {log.warn("found read redis config, but can't load a datasource fail!");}}}/*** 获取读连接池* @return 返回连接工厂*/public LettuceConnectionFactory getReadFactory() {// 简单的负载均衡:轮询方案if(pos.get()>max){pos.set(0);}int index = pos.getAndIncrement();log.info("chose redis-read-datasource index is [{}]", pos);return getReadFactory(index);}private LettuceConnectionFactory getReadFactory(int index) {if(index>max){log.warn("no suitable redis-read-datasource [{}], the max {}.", index, max);// 发生错误自动切换到写连接上去return writeConnectionFactory;}return readConnectionFactories.get(index);}/*** 获取写连接池* @return 返回连接工厂*/public LettuceConnectionFactory getWriteFactory() {return writeConnectionFactory;}/*** 创建Lettuce连接工厂** @param readWriteConfig redis配置* @param redisStandaloneConfiguration redis独立配置* @return 返回连接工厂*/private LettuceConnectionFactory createLettuceConnectionFactory(RedisReadWriteConfig readWriteConfig, RedisStandaloneConfiguration redisStandaloneConfiguration) {if (redisStandaloneConfiguration == null) {return null;}// 连接池配置GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();// 连接池中的最小空闲连接poolConfig.setMinIdle(readWriteConfig.getMinIdle());// 连接池中的最大空闲连接poolConfig.setMaxIdle(readWriteConfig.getMaxIdle());// 连接池最大连接数(使用负值表示没有限制,不要配置过大,否则可能会影响redis的性能)poolConfig.setMaxTotal(readWriteConfig.getMaxActive());// 连接池最大阻塞等待时间(使用负值表示没有限制)poolConfig.setMaxWaitMillis(readWriteConfig.getMaxWait());LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder lettucePoolingClientConfigurationBuilder= LettucePoolingClientConfiguration.builder();// 连接池配置lettucePoolingClientConfigurationBuilder.poolConfig(poolConfig);// 关闭超时时间,单位毫秒lettucePoolingClientConfigurationBuilder.shutdownTimeout(Duration.ofMillis(readWriteConfig.getShutdownTimeout()));// 超时时间,单位毫秒lettucePoolingClientConfigurationBuilder.commandTimeout(Duration.ofMillis(readWriteConfig.getTimeout()));LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, lettucePoolingClientConfigurationBuilder.build());lettuceConnectionFactory.afterPropertiesSet();return lettuceConnectionFactory;}@Overridepublic void destroy() throws Exception {writeConnectionFactory.destroy();if(!readConnectionFactories.isEmpty()){for(LettuceConnectionFactory connectionFactory:readConnectionFactories){connectionFactory.destroy();}readConnectionFactories.clear();}log.info("redis-datasource all closed success.");}}
重写RedisTemplate类实现多个只读库的动态切换
为了实现多个只读库能够自动的切换和轮询,需要对原来的RedisTemplate类做些升级,通过查看其源码发现每次执行相关操作前都会调用getRequiredConnectionFactory()
这个方法;基于此我们通过继承RedisTemplate
然后重写其getRequiredConnectionFactory()
方法来实现只读库的切换。
public class ReadWriteRedisTemplate<K, V> extends RedisTemplate<K, V> {private ReadWriteLettuceConnectionFactory readWriteConnectionFactory;private boolean isRead = false;/*** RedisTemplate每次执行方法时都会调用这个方法;如果只有1读1写,那么就没有必要再弄这个封装类,直接在创建的时候指定即可** @return RedisConnectionFactory*/@Overridepublic RedisConnectionFactory getRequiredConnectionFactory() {return getFactory();}public void setReadWriteConnectionFactory(ReadWriteLettuceConnectionFactory readWriteConnectionFactory, boolean isRead) {this.isRead = isRead;this.readWriteConnectionFactory = readWriteConnectionFactory;setConnectionFactory(getFactory());}private RedisConnectionFactory getFactory(){if(this.isRead){return this.readWriteConnectionFactory.getReadFactory();}return this.readWriteConnectionFactory.getWriteFactory();}}
配置RedisTemplate的注入bean
@Configurationpublic class RedisCachingConfigurer<K, V> extends CachingConfigurerSupport {/*** 读数据的RedisTemplate** @param factory LettuceConnectionFactory*/@Bean(name = "readRedisTemplate")public RedisTemplate<K, V> readRedisTemplate(ReadWriteLettuceConnectionFactory factory) {return redisTemplate(factory, true);}/*** 写数据的RedisTemplate** @param factory LettuceConnectionFactory*/@Bean(name = "writeRedisTemplate")public RedisTemplate<K, V> writeRedisTemplate(ReadWriteLettuceConnectionFactory factory) {return redisTemplate(factory, false);}private RedisTemplate<K, V> redisTemplate(ReadWriteLettuceConnectionFactory factory, boolean isRead) {//创建Redis缓存操作助手RedisTemplate对象ReadWriteRedisTemplate<K, V> template = new ReadWriteRedisTemplate<K, V>();template.setReadWriteConnectionFactory(factory, isRead);//设置key的序列化方式template.setKeySerializer(keySerializer());template.setHashKeySerializer(keySerializer());//将RedisTemplate的Value序列化方式由JdkSerializationRedisSerializer更换为Jackson2JsonRedisSerializertemplate.setValueSerializer(valueSerializer());template.setHashValueSerializer(valueSerializer());template.afterPropertiesSet();return template;}private RedisSerializer<String> keySerializer() {return new StringRedisSerializer();}private RedisSerializer<Object> valueSerializer() {Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会抛出异常om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);//解决时间序列化问题om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);om.registerModule(new JavaTimeModule());jackson2JsonRedisSerializer.setObjectMapper(om);return jackson2JsonRedisSerializer;}}
使用示例
@Autowiredprivate RedisTemplate<String, Student> writeRedisTemplate;@Autowiredprivate RedisTemplate<String, Student> readRedisTemplate;/*** 由使用者自己判断应该是读还是写;注意只读的只能做读操作*/@Overridepublic Student crud1() {// 获取数据Student val = this.readRedisTemplate.opsForValue().get("student-rw1:1");if (null == val) {log.info("缓存中数据不存在");val = Student.create();// 添加数据this.writeRedisTemplate.opsForValue().set("student-rw1:1", val);Student student2 = Student.create();student2.setId(2L);this.writeRedisTemplate.opsForValue().set("student-rw1:2", student2);this.writeRedisTemplate.opsForList().leftPush("student_list-rw1", val);this.writeRedisTemplate.opsForList().leftPush("student_list-rw1", student2);} else {// 删除数据this.writeRedisTemplate.delete("student-rw1:2");log.info("删除缓存");}log.info(JSON.toJSONString(val));// 获取列表数据List<Student> list = this.readRedisTemplate.opsForList().range("student_list-rw1", 0, 3);log.info(JSON.toJSONString(list));return val;}
封装工具类
从上面的例子上我们发现手动切换不是太方便,因此这里我封装一个帮助类
@Componentpublic class RedisHelper {private final RedisTemplate<String, String> writeRedisTemplate;private final RedisTemplate<String, String> readRedisTemplate;public RedisHelper(RedisTemplate<String, String> writeRedisTemplate, RedisTemplate<String, String> readRedisTemplate) {this.writeRedisTemplate = writeRedisTemplate;this.readRedisTemplate = readRedisTemplate;}/*** 设置值** @param key 缓存key* @param value 值* @param <T> Class* @return 返回操作结果*/public <T> boolean set(String key, T value) {if (value instanceof String) {return set(key, (String) value);}return set(key, JSON.toJSONString(value));}/*** 设置值** @param key 缓存key* @param value 值* @param validTime 缓存时间,单位秒* @param <T> Class* @return 返回操作结果*/public <T> boolean set(String key, T value, long validTime) {if (value instanceof String) {return set(key, (String) value, validTime);}return set(key, JSON.toJSONString(value), validTime);}/*** 设置值** @param key 缓存key* @param value 值* @param validTime 缓存时间,单位秒* @return 返回操作结果*/private boolean set(String key, String value, long validTime) {Boolean res = this.writeRedisTemplate.execute((RedisCallback<Boolean>) connection -> {RedisSerializer<String> serializer = this.writeRedisTemplate.getStringSerializer();byte[] keyByte = Objects.requireNonNull(serializer.serialize(key));byte[] valueByte = Objects.requireNonNull(serializer.serialize(value));connection.set(keyByte, valueByte);connection.expire(keyByte, validTime);return true;});return res != null && res;}/*** 设置某个值的缓存时间** @param key 缓存key* @param validTime 缓存时间,单位秒* @return 返回操作结果*/public boolean setExpire(String key, long validTime) {Boolean res = this.writeRedisTemplate.execute((RedisCallback<Boolean>) connection -> {RedisSerializer<String> serializer = this.writeRedisTemplate.getStringSerializer();byte[] keyByte = Objects.requireNonNull(serializer.serialize(key));connection.expire(keyByte, validTime);return true;});return res != null && res;}/*** 设置值** @param key 缓存key* @param value 值* @return 返回操作结果*/private boolean set(String key, String value) {Boolean res = this.writeRedisTemplate.execute((RedisCallback<Boolean>) connection -> {RedisSerializer<String> serializer = this.writeRedisTemplate.getStringSerializer();connection.set(Objects.requireNonNull(serializer.serialize(key)), Objects.requireNonNull(serializer.serialize(value)));return true;});return res != null && res;}/*** 获取值** @param key 缓存key* @param clazz 反序列的Class* @param <T> T* @return 返回单个结果*/public <T> T get(String key, Class<T> clazz) {return JSON.parseObject(getValue(key), clazz);}/*** 获取值** @param key 缓存key* @param clazz 反序列的Class* @param <T> T* @return 返回List*/public <T> List<T> getList(String key, Class<T> clazz) {return JSONArray.parseArray(getValue(key), clazz);}/*** 获取值** @param key 缓存key* @return 返回单个结果*/public String get(String key) {return getValue(key);}/*** 获取值** @param key 缓存key* @return 返回单个结果*/private String getValue(String key) {return this.readRedisTemplate.execute((RedisCallback<String>) connection -> {RedisSerializer<String> serializer = this.readRedisTemplate.getStringSerializer();byte[] value = connection.get(Objects.requireNonNull(serializer.serialize(key)));return serializer.deserialize(value);});}/*** 删除值** @param key 缓存key*/public void del(String key) {this.writeRedisTemplate.delete(key);}/*** 批量删除相同前缀的key** @param prefix 前缀*/public void batchDel(String prefix) {Set<String> keys = keys(prefix);if (null != keys && !keys.isEmpty()) {this.writeRedisTemplate.delete(keys);}}/*** 批量删除** @param keys 需要删除的key*/public void batchDel(Collection<String> keys) {this.writeRedisTemplate.delete(keys);}/*** 判断值缓存key是否存在** @param key 缓存key*/public boolean exist(String key) {Boolean res = this.writeRedisTemplate.hasKey(key);return res != null && res;}/*** 获取相同前缀的key** @param prefix 前缀*/public Set<String> keys(String prefix) {return this.readRedisTemplate.keys(prefix + "*");}/*** 如果key不存在则设置,此方法使用了redis的原子性** @param key key* @param value value* @param validTime 缓存时间; 若要是使用锁,建议设置有效时间,避免死锁* @return key不存在设置成功返回true; 否则返回false*/public boolean setNx(String key, String value, long validTime) {return setNx(key, value, validTime, TimeUnit.SECONDS);}/*** 如果key不存在则设置,此方法使用了redis的原子性** @param key key* @param value value* @param validTime 缓存时间; 若要是使用锁,建议设置有效时间,避免死锁* @param timeUnit 时间单位* @return key不存在设置成功返回true; 否则返回false*/public boolean setNx(String key, String value, long validTime, TimeUnit timeUnit) {try {ValueOperations<String, String> operations = this.writeRedisTemplate.opsForValue();Boolean lock = operations.setIfAbsent(key, value, validTime, timeUnit);return lock != null && lock;} catch (Exception e) {this.del(key);e.printStackTrace();}return false;}}
使用示例
@Autowiredprivate RedisHelper redisHelper;/*** 通过工具类自动切换*/@Overridepublic Student crud2() {// 获取数据Student val = this.redisHelper.get("student-rw2:1", Student.class);if (null == val) {log.info("缓存中数据不存在");val = Student.create();// 添加数据this.redisHelper.set("student-rw2:1", val);Student student2 = Student.create();student2.setId(2L);this.redisHelper.set("student-rw2:2", student2);List<Student> list = Lists.newArrayList(val, student2);this.redisHelper.set("student_list-rw2", list);} else {// 删除数据this.redisHelper.del("student-rw2:2");log.info("删除缓存");}log.info(JSON.toJSONString(val));// 获取列表数据List<Student> list = this.redisHelper.getList("student_list-rw2", Student.class);log.info(JSON.toJSONString(list));return val;}

文章转载自起岸星辰,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




