1.for循环扣减:对数据进行存储时,第一需要考虑的就是数据存储选型,mysql, redis,elasticsearch,mq,每种存储都有擅长的地方,和不擅长甚至容易出问题的地方.需要根据存储数据的特点结合数据存储的特点来选择;
先分析数据存储的特点:
| 存储类型 | 事务支持 | 写入速度 (tps) | 查询数据量(单表或支持分片查询) | 写后从库可见性 | 查询方式 | 查询(读取)速度(qps) | 存储模式(是否支持频繁删除或持久化长时间存储) |
|---|---|---|---|---|---|---|---|
| mq | 单行事务 | 写入快 >50000tps | 不支持随机查询 | 无 | 不能查询 | 不支持查询 | 不支持频繁删除 |
| mysql | 支持多行事务 | 写入慢,3000tps> | 查询数据量小,只能单表查询,不支持分片查询 | 从库延迟时间不确定 | 支持查询 | 查询速度慢,当大批量查询时有IO压力 | 不支持频繁删除 |
| elasticsearch | 单行事务 | 写入慢,3000tps> | 查询数据量大,允许分布式分片查询 | 秒级延迟 | 支持查询 | 查询速度较慢,有延迟 | 不支持频繁删除 |
| redis | 不支持事务(mysql多次写入可以解决) | 写入快>100000tps | 查询数据量小,只能单节点查询,不支持分片查询 | 从库延迟时间不确定 | 只允许kv查询 | 查询速度快,大批量传输也有一定的IO压力, | 支持频繁删除 |
从上可以归纳出个数据库都有一个极为擅长的点:
mysql:多行事务
mq:写入快且支持单行事务
elasticsearch:支持大规模分片查询
redis:写入快,查询快,支持频繁删除
elasticsearch和redis的区别在于elasticsearch可以持久化存储数据可以做到节点崩溃数据还在,redis本身就会在GC时删除键,redis也不支持分片存储无法自动路由(只能根据key的hash值来路由),如果希望数据一直在存储里,且是大规模数据,应该选用elasticsearch.
再来归纳数据的属性
| 数据属性 | 事务支持 | 写入速度(tps) | 写对读的关系:写入延迟要求 | 单次查询数据量(单表或支持分片查询) | 查询方式 | 查询(读取)速度(qps) | 存储模式(是否支持频繁删除或持久化长时间存储) |
|---|---|---|---|---|---|---|---|
| 单行事务 | 要求高>50000tps | (允许高延迟)500ms~1s | 单值查询 | 范围查询 | 500ms以上秒级别 | 持久存储 | |
| 多行事务 | 写入慢,3000tps< | (低延迟)500ms> | 分片查询 | kv查询 | 100ms以下 | 业务过后就要被删除(秒杀) |
因此可以根据数据库的特点来反推数据是否依赖这些特性.来确定选型.可以列一个表,分析数据在六个方面的属性特点,来决定选用哪种选型;实际上如果支持
如商品数量,具有写入速度快,需要查询的特点,那么就是redis.
如商品介绍,商品详情,数据量很大,可以使用elasticsearch,
再如定时任务:需要频繁删除,可以用redis;
秒杀任务:写入速度高,需要事务,用mq.
如果使用debezium记录mysql binlog,在同步到kafka,进而来到redis,由于kafka每秒50000tps,因此debezium到kafka延迟在100ms内,mysql到debezium200ms左右, kafka消费端到redis100ms以内,整体延迟不超过500ms,
实际的问题应该是,redis键消失时,可以通过mysql从库来更新,
因此高铁抢票业务:
redis扣减票数->写日志到rocketmq里->mq消费者写mysql->debezium消费binlog->同步到kafka里->kafka消费者同步到redis里
mysql操作:
1.扣减票数
2.建立订单消息. 实际上mysql主库仍然需要一个订单状态表,来保证扣减票数和建立订单是一个本地事务,因此不同的车票需要放到不同的库,
扣票逻辑:
1.建立订单, 写消息队列,预扣redis里的库寸,订单信息可以同步到redis里,之后延迟写到mysql表
2.根据支付状态扣减库寸,更新同库里的扣减状态,支付状态也可以放到redis里,加快查询;
3.根据扣减状态,更新订单内容
那么综上,核心库票库需要保留扣减状态表和数据表.
唯一id需要反应订单建立时间,以天为单位.
1.实现唯一ID服务端,客户端,
服务端: 集群代理交给nacos,单点提供服务,
客户端:提供重试机制,核心是CompletableFuture的onException,失败时重新发起服务,超过重试次数后就返回错误
客户端实现,该方式需要Feign或者Dubbo,为了保证改动,可以提前定义一个接口来实现这个功能
public Interface UniqueIdService{ long getUniqueId(); }由于试错机制是每个实现类都需要的公共逻辑,可以抽取一个AbstractUniqueIdService,或者使用aop切面
首先使用feign作为实现:
public class UniqueIgGetterFeignImpl implenments UniqueIdGetter{ UniqueIdFeignService uniqueIdFeignService;
public long getUniqueId(){ //CompletableFuture提供试错机制. return uniqueIdFeignService.getUniqueId(); }
}添加SpringAop切面
@Aspect
@Component
@Slf4j
public class UniqueIdServiceAspect {
@Value("uniqueService.retry.times")
Integer uniqueIdRetryTimes;
// 定义环绕通知,拦截被@Loggable注解标记的方法
@Around("execution(* com.cinus.order.service.UniqueIdService.getUniqueId(..))")
public Object aroundGetUniqueId(ProceedingJoinPoint joinPoint) throws Throwable {
String methodName = joinPoint.getSignature().getName();
log.debug("开始执行方法: {}" , methodName);
long start = System.currentTimeMillis();
Object result=null;
for (int i = 0; i < uniqueIdRetryTimes; i++) {
result = joinPoint.proceed(); // 远程调用.
if(result==null||Long.valueOf(result.toString())<0){
continue;
}else {
log.debug("unique获取成功,uniqueId={}",result);
return result;
}
}
long elapsedTime = System.currentTimeMillis() - start;
System.out.println("uniqueId获取失败: " + methodName + ",执行时间:" + elapsedTime + "毫秒");
return null;
}
}方案缺点: feign基于http1的请求应答模式,并发性能不好,未来可以借鉴lettuce的管道发送方式作为改善.
提交订单,扣减库寸的逻辑:
数据分析:订单提交tps很高,要求单行事务,可以使用rocketmq来实现,此外还要扣减在redis里的库寸,如果redis里面没有货了,需要回滚日志.可以把消息写入到redis里,加快后续查询.提交订单成功后就可以调取支付模块了.这需要客户端重新发起连接.是后续的事情.
redis扣减库寸逻辑:
redis扣减库寸使用Lua脚本实现,同时,库寸不足时需要回滚这个同样在lua里面实现.lua脚本:
1.for循环扣减
2.取出当前值,若小于扣减数量,将flag定为false,将Index定义为当前i值中断逻辑
3.如果flag为false,执行for循环,恢复库存
提交订单方法名:
public int submitOrderWithMessageQueue(OrderSubmitVo orderSubmitVo);整体上扣减库存流程:
1.redis写订单
2.扣减库存
3.写消息队列
4.消息队列失败需要回滚redis,
需要定义redis写订单,扣库寸,回滚的方法
public int submmitOrderToRedis(){}//提交订单,可以保存到用户为key的zset里 public int kjOrderToRedis(List<>){}//扣减库存 public int rollbackToRedis(List<>,long orderId,long userId){}redis回滚 public int sendToMessageQueue(Order order){}//发送到消息队列库寸扣减lua
--商品库寸扣减lua
local index=1
local flag=1
local limit=tonumber(ARGV[1])
for i=2,limit do
local num= tonumber(redis.call('GET',KEYS[i]));-- 获取商品数量
local result= redis.call('decrby',KEYS[i],ARGV[i])
if num<tonumber(ARGV[i]) then -- 小于扣减的商品数量就将flag置为0
index=i
flag=0
break
end
end
if (flag==0) then
for i=2 ,index do
redis.call('INCRBY',KEYS[i],tonumber(ARGV[i])) --回退商品
end
return index
end
return 0扣减库存是要不要锁MySQL,实际上商品数量小于订单数量是一个小概率事件,大部分商品都不会销售者那么快,所以只在redis里扣减库寸
redis目前使用Jedis,似乎lettuce在实现上不支持批量redis.
提交订单的方法:
1.根据购物车选项创建订单项,计算总价格,填写收货地址
2.生成订单id
3.提交订单内容(到redis里)
4.扣减库存
5.写消息队列
6.判断是否需要回滚
可以并行的路线有:
1.1->3->4
2.2
1,2结束后写消息队列, 6根据消息队列来判断是否需要回滚
顾客在购物车处选择确认订单实际上生成了一个只读的订单内容,提交到服务端,服务端会生成一个版本号,防重令牌.防重令牌代表着一次购物车里选中的购物项,这需要远程调用购物
购物车的内容有可能很多,避免这种写入,
1.调用购物车服务,拿到选中的商品
2.调用运费计算模块,计算运费
3.生成订单以及订单明细
4.写消息队列.
那么顾客在提交订单支付时,会携带这个防重令牌,服务端根据这个防重令牌,找到zset,并找到购物车里的内容,计算价格
后续支持:实际上得益于唯一id,根据订单号就可以查询支付状态,进行库寸扣减.那么支付成功后,实际上扣减库寸仍然会进行,我们要明白,其实已经经过redis预扣了,不至于大量没有.
这里的问题其实是redis,和mysql的数据延迟,如果两者相近,
rocketmq消费者消费:
1.由于订单异步落库,消费者需要异步处理多个任务:
a.扣减库寸
b.建立订单
c.回单
d.支付成功后的业务, 应该就是回调通知用户就结束了.
e.支付成功后立即发货.
这个其实可以通过回调来实现.
支付成功->写消息队列. 两个消费者,一个负责回调用户,一个负责发货.用户客户端负责校验.
扣减库寸由支付回调函数实现, 第三方支付成功后会回调地址,该url对应的函数写支付成功消息队列
建立订单由消息队列实现,mq消费者消费订单明细
回单采用延迟任务实现,消费者将已提交订单消息队列里的订单号写入延迟队列
发货逻辑:对于支付成功的订单,执行发货逻辑.这里也可以用延迟队列来实现.
回单属于延迟任务,用于超时支付. 这里可以用消息队列再次写消息队列实现. 扣减库存,消息队列扣减库存,其实可以提供两次机会,支付成功后回调时再扣减一次,这时还没有就算了. 支付成功后,需要快速判断扣减成功以及能否派货. 这个需求当前业务是做不到的. 建立订单,另一个消费者执行,由于分布式的情况, 派货业务无法立即开始,因为不知道什么时候支付成功了,数据库扣减库存逻辑: 支付成功后回调写消息队列,消息队列的消费者扣减库存,如果库寸不足直接报告失败.写入退款的消息队列
1.建立订单,分为订单详情和订单明细:
由消息队列消费者实现,拿到消息队列后异步扣减.此处应该需要shardingJDBC,因此也是用接口先实现
通过OnMessage,onMessage会自动提交消息位移,
优化:采用多线程消费者进一步加快消息消费进度.因为本质就是写数据库而已.不需要顺序消费
@Component@Slf4j
@RocketMQMessageListener(topic = "tbmall-order", consumerGroup = "tbmall-order-toDb-consumer-group",
// selectorType = SelectorType.SQL92,
// selectorExpression = "age > 12",
messageModel= MessageModel.CLUSTERING
)
public class OrderToDbConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
OrderCreateTo orderCreateTo = JSON.parseObject(message, OrderCreateTo.class);
mqOrderToDbService.saveOrder(orderCreateTo); //插入到数据库中
}
}插入数据库方法名:
@Componentpublic class MqOrderToDbServiceAloneImpl implements MqOrderToDbService {
@Autowired
OrderService orderService;
@Override
public int saveOrder(OrderCreateTo orderCreateTo) {
orderService.saveOrder(orderCreateTo);
return 0;
}
}2.扣减库寸消费者,
消费者需要根据订单明细,实现扣减库寸的逻辑.消费者可以从redis或者mysql里拿到订单明细,
其实正确的方法应该是,支付回调时同步写入订单详细信息.但我们应该明白支付宝不一定能拿到的.靠谱的做法还是消息不存在就直接写到另一个消息队列里,实际上用不了这么久.订单表需要实现分库分表,多线程消费情况下比扣减库存逻辑快得多.扣减库寸tps可能也就2000qps,插入订单的逻辑只要分的库够多应该可以达到上万的tps,这点没问题.托底就是放入延迟队列,稍后扣减库存.
扣减库存可以从redis,mysql获取数据,获取不到该条消息就放入延迟队列里处理./
代码逻辑:
0.解码得到订单id;
1.获取订单信息
2.扣减库寸(数据库事务1)
3.记录扣减状态扣减失败就需要回退订单(数据库事务2)
4.将扣减库存执行结果写入消息队列里(如果扣减成功,就发送出货相关信息;如果扣减失败,发送退款逻辑)
方法需要正确处理2,3的两个事务的关系以及4写消息队列与数据库事务的关系,下面对两者执行结果产生的影响进行分析:
1.如果数据库事务执行成功,消息发送失败,通过重新消费消息解决,数据库有幂等性.
2.如果数据库执行失败,消息发送成功,这就是绝不允许出现的,
因此可以通过rocketmq事务消息解决这个问题.又或者采用底层控制的事务解决,让事务必须提前执行.两者再tps上差距会比较明显,倾向于选择底层事务控制.
底层事务控制方法:
手动设置事务 (此后不管做什么事务都已经提交了,剩下执行的情况已经无关了) 发送队列(发送失败就重新执行.)
@Component@Slf4j
@RocketMQMessageListener(topic = "tbmall-order-sub", consumerGroup = "tbmall-order-toDb-substract-consumer-group",
// selectorType = SelectorType.SQL92,
// selectorExpression = "age > 12",
messageModel= MessageModel.CLUSTERING,
consumeMode = ConsumeMode.CONCURRENTLY
)
public class OrderToSubstractConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
Long orderId= Long.valueOf(message); Order order=getOrderFromDb(orderId);//从数据库,redis里获取数据 int status= executeOrder(Order);//扣减库存;返回值表示是否成功 这里不仅要扣减库存还要插入一条数据,表示插入已经成功,作为幂等性验证. //失败就要发起退款操作了;, if(!status) rollabckOrder(order);//这里是写消息队列,表示现在要退款了; }
}
扣减方法逻辑
扣减有两种情况, 扣减成功1 扣减失败-1 ,扣减失败表示有商品库寸不足,需要退货,因此
* 回退时status需要从1变为2表示回退成功
* 对于扣减库寸业务,扣减时该行不能存在,存在就表明已经消费过了.也不用消费该消息
* 需要做到扣减库寸失败时,要记录扣减状态为-1, 扣减业务需要回滚
因此这是两个事务:
1.扣减库存事务
2,记录扣减状态事务;
这里的关键是2如果失败, 1也必须失败;1失败,2可以提交.因此2是子事务,1是父事务,方法应该写在WareSkuLockStatusService里,
2可以根据1抛出的异常记录决定自己是否提交
a.1库存不足,2可以提交,记录status=-1
b.1抛出其他异常,2需要回滚,
c.1正常执行,2可以提交.
d.2失败,1必须回滚
rocketmq事务,扣减时必须记录下sku对应仓库的id,扣减的数量,以及一些的明细,给后续仓库出货提供信息,
public int KjStockInDb(Long orderId){
WareSkuLockStatusEntity statusEntity = wareSkuLockStatusService.getById(orderId);
if(statusEntity!=null){
/**
* 扣减由两种情况, 扣减成功1 扣减失败-1 ,扣减失败表示有商品库寸不足,需要退货,因此
*/
//status不为0,1表示已经扣减成功,2表示回退成功,都表示不需要再消费本次消费,
//实际上只要status不为null,就表示肯定扣减成功了.
return 0;
}
WareSkuLockVo wareSkuLockVoFromDb = getWareSkuLockVoFromDb(orderId);
return 0; }
mysql订单扣减
在商城项目初始时,所有sku都放在一个库里,这样是不对的,应该做的应该是按照spu,种类进行分库,最后的扣减则由订单状态表统一归纳,订单扣减失败时,订单中扣减成功的商品通过消息队列异步返还,
1.库存库的库存表扣减库存
2.根据扣减结果更行订单项表
3.根据所有订单项的扣减结果更新订单结果
库存扣减可能只有20个库,但订单库可能有100个,核心是说发送线程要不要是两个消费者,应该没必要,扣减后直接发送消息.应该可以,这也没什么.失败了再发就好了.也就是说,只要发过去的,一定是成立的.
后续消费者根据扣减结果修改订单库里的订单项表,扣减失败的执行退款
整体上只有两个消息队列参与:
tbmall-stock-deduct:存储订单项
tbmall-stock-detuct-status: 更新扣减状态.
退款我感觉不要这么着急,真不能这么急.可以不做的.
两个消息队列内容是一样的,其实可以按照一个来处理.前一个消费者需要等到后一个执行成功才能消费.所以这个业务需要使用延时队列来实现是最好的,因为有前边的redis不会超卖的保证,这里这么做是合理的.
1.扣减失败时,需要向kafka/rocketmq发送消息,表示某订单号代表的订单存在扣减失败的情况,消费者拿到消息后更新订单状态为扣减失败,于此同时,其他扣减正常进行.
步骤1中会存在两种异常情况, 事务执行完成,但是消息发送失败或者事务执行失败,消息发送成功,/以及消息实际接收成功,但是因为网络,发送者没有接收到,导致发送者认为执行失败
导致消费者认为消息消费失败而重复消费,重复消费时,扣减库存成功,消息发送成功,这里业务无法保证幂等,需要使用事务消息.回查本地事务状态.
事务执行成功,但消息发送失败时,消息消费失败,会导致重复消费,但这重复消费时发现事务执行成功,导致消费者认为消息已经消费成功,
那么这里的问题是,发送消息队列和事务执行不能再一个事务里,应该交给另一个消费者来执行,称B,执行事务消费者称为A, B需要在A后面执行,如何确保B一定落后A.A向B发送信号不可取,可以让B查看A的消费位移来决定是否拉取最新消息.这可以通过redis实现.这个时延是很低的,可以这么做.
2.另一消费者对支付成功的订单进行统一处理,根据扣减状态,决定是否返还库寸.
3.那么哪个返还库寸呢?无法确定是否成功,只能通过延时任务,来选择库寸,因为无法确保每时每刻订单项的扣减情况
1.比较好的方法是: 扣减库存前,查询redis缓存,看看某个订单的状态,查不到表示没问题.继续执行. 查询到就终止本次执行,并表示消息消费成功,
对于已经成功的.需要及时回退库存,这个该怎么做呢?实际上只能通过延时消息兜底返还.库寸. 或者说业务上就是用部分退款的策略,并由用户主动退款来解决.
总结:
订单扣减分布式事务:
1.提交订单时发送两个消息队列:
1.扣减消息队列:扣减数据库库存: tbmall-order
2.延时消息队列,关单: tbmall-order-delay-check
2.扣减订单成功后,再发消息队列,用于更新订单状态: tbmall-order-orderitem-deduct-status
3.延时关单时发送回退库存消息队列: tbmall-ware-restock
4.回退库寸成功后,更新回退成功消息队列: tbmall-ware-restock-status
5.支付成功,回调地址写支付成功消息队列: tbmall-order-payment
共6个消息队列支持;
1.sku按照种类分库,扣减库存时按照订单项扣减,使用消息队列,这里有两个消息队列消费者A,B
A负责扣减库存,写扣减状态, 完成后将偏移量同步到redis中
B负责查看redis里的偏移量,实现延时队列的效果,根据A扣减的状态更新订单状态(可以实现部分退款)
2.延时队列C对数据进行统一兜底,查看订单状态,完成部分退款任务.
扣减库存的关键是redis里的库寸要时刻小于等于mysql里的,这是业务的关键.这涉及到redis里的数据崩溃后怎么恢复.
崩溃后持有数据的是mysql里的库,redis子库,实际上是不可能的,而且双11也是支持部分退货的.因此.部分扣减库存,返回优惠券是合适的,如果该商品具有专属优惠券,就返还专属优惠券,否则就不做返还.
从实际角度来看,商城项目中,debezium向redis同步mysql数据是不对的,应该是通过消息队列定时关单实现返还库存,然后在闲时对比redis与mysql数据,对就是这样,返还库寸用的是定时关单系统.
如何避免redis单点故障呢,还是应该把数据放到多个地方,
实际可以维护一个锁单的zset,在恢复库存时做到 恢复库寸和删除锁单项的原子性. redis的数据同步只能防止少卖.
利用消息队列关单,只能多次消费. 其实可以使用bitmap来实现关单的原子性.
实际上真实扣减的数据在mq里,可以在mysql里记录每个redis版本的初始值,
skuId: 200
那么kafka在退单时可以拿到这个数量, 其实退单可以使用聚合消费的策略. redis崩溃后应该停机10分钟,确保之前的订单都消费完成,然后使用消息队列恢复数据.
我们可以把返单记录为一个10分钟一次的定时任务,然后每个周期向redis更新这些数值即可.
实际上不解决问题.这个应该是库寸为0,redis崩溃以后的解决策略.定时任务需要查询当前商品数量是否是0,如果是,那就可以这么处理;
实际上主动维护是一个更好的思路,比如我们有5台redis机器,redis代理按照时间的策略,每一个周期都有一台redis进入维护期,期间执行数据对账,订单时间是10分钟,那么维护时间为15分钟,期间mq消费者执行回单业务.对于崩溃的使用mysql数据来全量恢复.
正常节点怎么处理:
正常节点应该就是mq消费了. redis里面可以缓存某个key的redo_log_zset,定时任务回放这个更新就行了,一旦出现异常比如zset不在了,直接将商品数量置为0
redis, mysql数据同步
redis使用版本号,redo_log队列保证回退库寸,崩溃后利用数据库重建
库存扣减:
tagA:扣减库存 将状态发给tagC消费的消息队列
tagB:记录订单项 记录订单项.
tagC:记录订单扣减状态 记录到扣减明细表;
tagD:延时关单,根据订单号查询订单支付情况,据此退回库存,延时时间为10分钟
实际上C是下游,就是要靠消息队列来实现.没有别的办法.
tagD比较复杂,需要
扣减成功但没有付款的:
a:扣减明细表的内容取出来退回到库存单里.
b:更新订单明细表表示已经退回库存;
c.更新订单状态为已经退完库存了 c是延迟消息;
a,b仍然需要消息队列来实现,tagD发送需要返回库存的消息,下一个需要发送更新订单状态 的消息,c只能是是采用延迟消息实现.
因此tagD需要做的就是发送两次消息,
批量回退消息
orderEntity消息
改良后决定只发一次消息, 这样消费到orderEntity时大概率所有都回退成功了,如果成功了,应该可以做二次转发,转发给更新订单状态的消息队列,让他去查看回退状态.嗯嗯,这样可以.
之后两个消费者开始执行回扣库存业务
延时消费(5min)检查库存更新状况,必须保证所有都退回,然后更新订单状态.找不到的接着延时,
部分扣减失败的,付款成功的:
a.部分退款
b.回退库存;
c.更新扣减明细表状态为已经回退
扣减失败,没有付款:
扣减成功,付款成功:
a.安排发货
a,b仍然是上下游,还是要发消息队列来实现.




