当前客户端代理仅仅是代理多个单节点的redis,针对的是写操作
1.在执行订单扣减库存业务时,不同一级分类的sku会存储在不同的redis节点上,在执行扣减时就需要底层自动的进行redis路由,这种简单的功能不需要codis等redis代理,因此编写了一个基于dubbo的redis代理.
不同的redisTemplate绑定不同的connectionFactory;配置类如下:
@Data
public class RedisConfig {//每一个RedisConfig对应一个Redis server,一个RedisTemplate
Integer timeout;
Integer shutdownTimeOut;
Boolean autoReconnect;
Boolean pingBeforeActivateConnection;
Integer maxActive;
Integer maxIdle;
Integer minIdle;
Integer maxWait;
String host;
Integer port;
Integer database;
String password;
String serverKey; //一级分类key:订单,商品,新闻,消息,后台信息等
List<String> routeKey; //二级分类key:比如商品中的衣服类,电子设备类等,不同一级分类使用不同的标签;
StringRedisTemplate将会被代理,因为业务本身也需要对stringRedisTemplate做一些额外控制.因此设计了一个Wrapper来封装StringRedisTemplate
@Data
public class StringRedisTemplateWrapper{
String netAddr;//redis地址
AtomicInteger status;//当前节点状态,0表示正常运行,-1表示当前不能使用
StringRedisTemplate stringRedisTemplate;
Map<String, AtomicInteger> secondLevelStatusMap;//记录二级业务状态 redis节点里的数据会和多个业务关系,该内容表示相关业务当前是否允许服务,只是一个考虑,还没有用到
public static StringRedisTemplateWrapper build(StringRedisTemplate stringRedisTemplate){
StringRedisTemplateWrapper stringRedisTemplateWrapper = new StringRedisTemplateWrapper();
stringRedisTemplateWrapper.setStatus(new AtomicInteger());
stringRedisTemplateWrapper.setStringRedisTemplate(stringRedisTemplate);
stringRedisTemplateWrapper.setSecondLevelStatusMap(new ConcurrentHashMap<>());
return stringRedisTemplateWrapper;
}
//其实最好用责任链模式
public int beforeExecute(String secondLevelKey){
return checkStatus(secondLevelKey);
}
public int checkStatus(String secondLevelKey){
if(status.get()==-1)return -2;
AtomicInteger atomicInteger = secondLevelStatusMap.get(secondLevelKey);
if(atomicInteger==null||atomicInteger.get()!=0) return -1;
return 0;//0表示允许本次执行
}
}
使用时Wrapper也不会直接注入到Component里,而是统一放在RedisConfigure的map里,方便管理;
package org.cinus.config;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.TimeoutOptions;
import lombok.Data;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Data
@ConfigurationProperties(prefix = "myredis")
//@PropertySource("classpath:application.yml")
@Component
public class RedisConfigure {
private List<RedisConfig>configs=new ArrayList<>();
//同一个dubbo进程只有一个Map
private Map<String, Map<String, Set<String>/*ip:port*/>> netAddrMap= new ConcurrentHashMap<String, Map<String, Set<String>/*ip:port*/>>();
private Map<String /*ip:port*/, StringRedisTemplateWrapper> redisTemplateMap= new ConcurrentHashMap<>();
private Map<String,String>luaScriptMap=new ConcurrentHashMap<>();
private ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
//for循环构建多源得redis实例
public void buildMultiSourceStringRedisTemplate(){
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try{
for(RedisConfig config: configs){
buildStringRedisTemplateWithRedisConfig(config);
}
}catch (Exception e){
}finally {
writeLock.unlock();
}
}
//insert, delete, 对于连接失败的需要及时下线,放入后备队列后重新连接.这里没有必要删除,而是由,worker线程主动标记状态,发布事件,后台线程将其移入观察队列里,
//主动下线也是,
//移除某个redis节点.这里需要申请写锁
public boolean deleteStringRedisTemplate(String serverKey,String routeKey,String host){
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try{
Map<String, Set<String>> stringMapMap = netAddrMap.get(serverKey);
if(stringMapMap!=null&&stringMapMap.size()>0) {
Set<String> set = stringMapMap.get(routeKey);
if (set != null && set.size() > 0) {
boolean res = set.remove(host);
return res ;
}
}
}catch (Exception e){
}finally {
writeLock.unlock();
}
return false;
}
//方便添加redisTemplate;
public void buildStringRedisTemplateWithRedisConfig(RedisConfig config) {
//server端配置
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(config.getHost());
configuration.setPort(config.getPort());
configuration.setPassword(RedisPassword.of(config.getPassword()));
configuration.setDatabase(config.getDatabase());
//client端 pool config 连接池配置,三级分类下每个节点相同
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxTotal(config.getMaxActive());
genericObjectPoolConfig.setMinIdle(config.getMinIdle());
genericObjectPoolConfig.setMaxIdle(config.getMaxIdle());
genericObjectPoolConfig.setMaxWaitMillis(config.getMaxWait());
//针对每个连接创建connectionFactory;
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();
builder.poolConfig(genericObjectPoolConfig);
builder.commandTimeout(Duration.ofSeconds(config.getTimeout()));//命令超时时间
builder.shutdownTimeout(Duration.ofSeconds(config.getShutdownTimeOut()));
ClientOptions clientOptions = ClientOptions.builder()
.autoReconnect(config.getAutoReconnect())
.pingBeforeActivateConnection(config.getPingBeforeActivateConnection())
.build();
builder.clientOptions(clientOptions);
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(configuration, builder.build());
StringRedisTemplate stringRedisTemplate = createStringRedisTemplate(lettuceConnectionFactory);
//存入map里
// 业务分级
Map<String,Set<String>> map = netAddrMap.getOrDefault(config.getServerKey(),new ConcurrentHashMap<>());
//同一业务不同种类,比如衣服,裤子,背包,实际上第二种很难实现,可以直接写成业务名,数据库名,等
List<String> routeKey = config.getRouteKey();
String key=config.getHost()+":"+config.getPort();
for (String s :routeKey) {
Set<String> set = map.getOrDefault(s, new CopyOnWriteArraySet<>());
set.add(key);
}
//同一种类的不同节点,起到避免单点故障引起集群问题
StringRedisTemplateWrapper stringRedisTemplateWrapper = redisTemplateMap.get(key);
if(stringRedisTemplateWrapper!=null){
Map<String, AtomicInteger> statusMap = stringRedisTemplateWrapper.getSecondLevelStatusMap();
for (String s : routeKey) {
statusMap.put(s,new AtomicInteger());
}
statusMap.put(config.getServerKey(),new AtomicInteger());
return;
}
StringRedisTemplateWrapper build = StringRedisTemplateWrapper.build(stringRedisTemplate);
build.setNetAddr(key);
redisTemplateMap.put(key,build);
}
public StringRedisTemplate createStringRedisTemplate(RedisConnectionFactory redisConnectionFactory){
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
当Wrapper断开连接时,status会被标记为-1,由后台线程负责重新连接后将status标记为0,重新可以使用;
package org.cinus.config;
import lombok.Data;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Data
public class PingWorker implements Delayed {
public static final Integer MAX_RETRY_TIMES=8;
public static final Long FIRST_RETRY_TIMES=500L;
public static final Long SECOND_RETRY_TIMES=1000L;
public static final Long THIRD_RETRY_TIMES=4000L;
public static final Long FOUR_RETRY_TIMES=30000L;
public static final Long FIVE_RETRY_TIMES=60_000L;
public static final Long SIX_RETRY_TIMES=300_000L;
public static final Long SEVEN_RETRY_TIMES=1800_000L;
public static final Long EIGHT_RETRY_TIMES=3600_000L;
public static final Map<Integer,Long> map;
static{
map=new HashMap<>();
map.put(0,0l);
map.put(1,FIRST_RETRY_TIMES);
map.put(2,SECOND_RETRY_TIMES);
map.put(3,THIRD_RETRY_TIMES);
map.put(4,FOUR_RETRY_TIMES);
map.put(5,FIVE_RETRY_TIMES);
map.put(6,SIX_RETRY_TIMES);
map.put(7,SEVEN_RETRY_TIMES);
map.put(8,EIGHT_RETRY_TIMES);
}
StringRedisTemplateWrapper wrapper;
Long delayTime;
Integer retryTimes;
@Override
public long getDelay(TimeUnit unit) {
return delayTime-System.currentTimeMillis();//返回剩余延时时间
}
@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
}
public int ping(){
StringRedisTemplate stringRedisTemplate = wrapper.getStringRedisTemplate();
try {
RedisConnectionFactory connectionFactory = stringRedisTemplate.getConnectionFactory();
RedisConnection connection = connectionFactory.getConnection();
String ping = connection.ping();
} catch (Exception e) {
return -1;
// throw new RuntimeException(e);
}
return 0;
}
public static PingWorker buildPingWorker(StringRedisTemplateWrapper wrapper){
PingWorker pingWorker = new PingWorker();
pingWorker.setWrapper(wrapper);
pingWorker.setRetryTimes(0);
pingWorker.setDelayTime(0L);
return pingWorker;
}
}
package org.cinus.config;
import io.lettuce.core.api.reactive.RedisTransactionalReactiveCommands;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/*
对于CommandException的redisTemplate, 放到延时队列里,
方案的核心是要添加异步操作;如果抛出异常就增加下次延时执行的时间,该方法为递归调用.
*/
@Data
@Component
public class RedisCommandTimeoutWorker {
public static DelayQueue<PingWorker> queue=new DelayQueue<>();
public AtomicInteger status=new AtomicInteger();
public static ScheduledThreadPoolExecutor scheduleService=new ScheduledThreadPoolExecutor(1);
public static void publishEvent(StringRedisTemplateWrapper wrapper){
PingWorker pingWorker = PingWorker.buildPingWorker(wrapper);
queue.add(pingWorker);
}
@PostConstruct
public void afeterPropertiesSet(){
scheduleService.scheduleWithFixedDelay(
() -> schedule(),
10,
5,
TimeUnit.MILLISECONDS
);
}
public void schedule(){
while(status.get()!=-1){
PingWorker poll = null;
try {
poll = queue.poll(500, TimeUnit.MILLISECONDS);
if(poll==null){
Thread.sleep(100);
continue;
}
int ping = poll.ping();
if (ping != 0) {
offerQueueAgain(poll);
}else{//表示当前节点可用
StringRedisTemplateWrapper wrapper = poll.getWrapper();
AtomicInteger status1 = wrapper.getStatus();
int prev = status1.get();
while(!status1.compareAndSet(prev,0)){
prev=status1.get();
}
}
} catch(Exception e){
offerQueueAgain(poll);
}
}
}
public void offerQueueAgain(PingWorker poll){
//重新入队
StringRedisTemplateWrapper wrapper = poll.getWrapper();
int p=wrapper.getStatus().get();
while(!wrapper.getStatus().compareAndSet(p,-1)){
p=wrapper.getStatus().get();
}
Long delayTime = poll.getDelayTime();
Integer retryTimes = poll.getRetryTimes();
retryTimes=retryTimes==PingWorker.MAX_RETRY_TIMES?retryTimes:retryTimes+1;
Long aLong = PingWorker.map.get(retryTimes);
long nextPingTime=System.currentTimeMillis();
if(aLong==null){
nextPingTime+=0l;
}else{
nextPingTime+=aLong.longValue();
}
poll.setDelayTime(nextPingTime);
poll.setRetryTimes(retryTimes);
queue.add(poll);
}
}
商品扣减服务如下:基本都是用lua脚本来实现的
package org.cinus.service.impl;
import io.lettuce.core.RedisCommandTimeoutException;
import org.cinus.config.RedisCommandTimeoutWorker;
import org.cinus.config.RedisConfig;
import org.cinus.config.RedisConfigure;
import org.cinus.config.StringRedisTemplateWrapper;
import org.cinus.entity.OrderItemEntity;
import org.cinus.service.OrderService;
import org.cinus.service.RedisRouteService;
import org.cinus.vo.OrderItemVoWithOrderIdAndRedisId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 商品库存扣减,回滚
*/
@Service
public class OrderServiceImpl implements OrderService {
private static final String ROLLBACK_ORDER_DISTRIBUTED="local skuID=KEYS[1]\n" +
"local skuNum=ARGV[1]\n" +
"local numStr=redis.call('hget','sku',skuID)\n" +
"local num, offset = string.match(numStr, \"([^:]+):([^:]+)\")\n" +
"-- 如果值是数字,可以转换\n" +
"num = tonumber(num) or 0\n" +
"--更新商品\n" +
"local skuNewNum=num+skuNum\n" +
"local skuNew=skuNewNum .. tostring(offset)\n" +
"redis.call('hset','sku',skuID,skuNew)\n" +
"return 0";
private static final String KJ_SKU_STOCK_DISTRIBUTED="-- 查询商品数量,不够就不扣了\n" +
"-- 查询sku对应的zset的成员分数值,对该值增1,失败就重试三次(hashmap结构,hget)\n" +
"-- 更新zset分数值\n" +
"-- 写zset\n" +
"-- 更新sku值.\n" +
"-- 查询redisId,返回\n" +
"-- 实现为:\n" +
"local skuNumNeed=tonumber(ARGV[1]) or 0\n" +
"local skuId=KEYS[1]\n" +
"local sku_redo_log_queue=KEYS[2]\n" +
"local orderItemId=ARGV[2]\n" +
"-- 查询redisId\n" +
"local redisId=redis.call('hget','metadata','redis_id')\n" +
"if redisId==nil or redisId==\"\" then\n" +
" return \"\"\n" +
"end\n" +
"-- 查询商品数量,不够就不扣了\n" +
"local numStr=redis.call('hget','sku',skuId)\n" +
"local key, value = string.match(numStr, \"([^:]+):([^:]+)\")\n" +
"-- 如果值是数字,可以转换\n" +
"key = tonumber(key) or 0\n" +
"if key < skuNumNeed then\n" +
" return \"\"\n" +
"end\n" +
"-- 查询sku对应的zset的成员分数值,对该值增1,失败就重试三次(hashmap结构,hget)\n" +
"local offset=redis.call('hget','queueOffset',sku_redo_log_queue)\n" +
"-- 更新zset分数值\n" +
"local numOffset = tonumber(offset) or 0\n" +
"local newOffset=numOffset+1\n" +
"redis.call('hset','queueOffset',sku_redo_log_queue,newOffset)\n" +
"-- 写zset\n" +
"local offsetC=tostring(redisId)..':'..tostring(sku_redo_log_queue)..':'..tostring(offset)\n" +
"redis.call('zadd',sku_redo_log_queue, offset,orderItemId)\n" +
"--更新商品\n" +
"local skuNewNum=key-skuNumNeed\n" +
"local skuNew=skuNewNum .. ':'..tostring(offset)\n" +
"redis.call('hset','sku',skuId,skuNew)\n" +
"-- 唯一确定一次库存扣减\n" +
"return tostring(offsetC)";
public static final String FIRSTLEVELKEY="order";
@Autowired
RedisConfigure redisConfigure;
@Autowired
@Qualifier("orderRedisRouteService")
RedisRouteService redisRouteService;
@Autowired
ExecutorService executorService;
// public static ExecutorService executorService= Executors.newFixedThreadPool(5);
//list里的sku都在一个redis里.订单一次扣减
@Override
public Map<Long,String> deductStock(List<OrderItemVoWithOrderIdAndRedisId> list) {
HashMap<Long, String> resultMap = new HashMap<>();
HashMap<OrderItemVoWithOrderIdAndRedisId, StringRedisTemplateWrapper> wrapperMap = new HashMap<>();
String secondLevelKey = list.get(0).getTag();
List<StringRedisTemplateWrapper> wrappers = redisRouteService.getRedisTemplateWrapperByRouteKey(FIRSTLEVELKEY, secondLevelKey);
boolean flag=false;
for(OrderItemVoWithOrderIdAndRedisId vo: list){
//执行扣减
for (StringRedisTemplateWrapper wrapper:wrappers){
//确定当前节点是否支持查询
if(wrapper.beforeExecute(secondLevelKey)==0){
//允许执行
String result=deductStock(vo,wrapper);
if(result==null||"".equals(result)){
flag=true;
continue;
}
//设置结果
resultMap.put(vo.getOrderItemId(),result);
wrapperMap.put(vo, wrapper);
flag=false;
break;
}else{
flag=true;
continue;
}
}
if(flag){//回滚已经成功的商品
rollbackStock(wrapperMap);
break;
}
}
return flag?null:resultMap;
}
@Override
public int rollbackStock(List<OrderItemVoWithOrderIdAndRedisId> list) {
List<StringRedisTemplateWrapper> wrappers = redisRouteService.getRedisTemplateWrapperByRouteKey(FIRSTLEVELKEY, list.get(0).getTag());
HashMap<String, StringRedisTemplateWrapper> wrapperMap = new HashMap<>();
for (StringRedisTemplateWrapper wrapper : wrappers) {
wrapperMap.put(wrapper.getNetAddr(),wrapper);
}
for (OrderItemVoWithOrderIdAndRedisId vo : list) {
//查找到redisMap;返回
executorService.submit(
()->{
String redisNetAddr = vo.getRedisNetAddr();
StringRedisTemplateWrapper wrapper = wrapperMap.get(redisNetAddr);
if(wrapper==null) {
return;
}
rollbackStock(vo,wrapper);
}
);
}
return 0;
}
public String executeLua(StringRedisTemplateWrapper wrapper,List<String>keys,List<String> argvs){
return null;
}
public String deductStock(OrderItemVoWithOrderIdAndRedisId vo, StringRedisTemplateWrapper redisTemplateWrapper){
OrderItemEntity orderItemEntity = vo.getOrderItemEntity();
StringRedisTemplate stringRedisTemplate = redisTemplateWrapper.getStringRedisTemplate();
List<String> skuIds= new ArrayList<>();
List<String> skuNums= new ArrayList<>();
skuIds.add(orderItemEntity.getSkuId().toString());
long redoLogQueueId=(orderItemEntity.getSkuId()) % 50;//分成了50个zset,增加并发数.
skuIds.add("sku_redo_log:"+redoLogQueueId);//sku所在的zsst名字
skuNums.add(orderItemEntity.getSkuQuantity().toString());
Long orderItemId=vo.getOrderItemId();
skuNums.add(orderItemId.toString());//存储orderItemId
//执行扣减
String redisId = stringRedisTemplate.execute(
new DefaultRedisScript<String>(KJ_SKU_STOCK_DISTRIBUTED, String.class), //脚本,返回值
skuIds, //KEYS[]
skuNums.toArray() //ARGV[]
);
return redisId+"&"+redisTemplateWrapper.getNetAddr()/*netAddr用于回滚*/;
}
public String rollbackStock(Map<OrderItemVoWithOrderIdAndRedisId, StringRedisTemplateWrapper>map){
for(Map.Entry<OrderItemVoWithOrderIdAndRedisId, StringRedisTemplateWrapper>entry:map.entrySet()){
OrderItemVoWithOrderIdAndRedisId key = entry.getKey();
StringRedisTemplateWrapper value = entry.getValue();
rollbackStock(key,value);
};
return "rollback success";
}
public int rollbackStock(OrderItemVoWithOrderIdAndRedisId vo, StringRedisTemplateWrapper wrapper){
if(wrapper.beforeExecute(vo.getTag())==0){//允许执行
StringRedisTemplate stringRedisTemplate = wrapper.getStringRedisTemplate();
List<String> skuIds= new ArrayList<>();
List<String> skuNums= new ArrayList<>();
String redisId=vo.getRedisId();
skuIds.add(vo.getOrderItemEntity().getSkuId().toString());
skuNums.add(vo.getOrderItemEntity().getPromotionAmount().toString());
boolean flag=true;
try{
Integer res = stringRedisTemplate.execute(
new DefaultRedisScript<Integer>(ROLLBACK_ORDER_DISTRIBUTED, Integer.class),
skuIds,
skuNums.toArray()
);
return res;
}catch (RedisCommandTimeoutException e){
RedisCommandTimeoutWorker.publishEvent(wrapper);
}
}
return -1;
}
}
最后修改时间:2025-10-25 22:02:57
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




