- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
如果一个饭店只有一个服务员,并且这个服务员不仅需要负责客人的点餐服务,还需要负责炒菜服务,显然这样的话,只能是先处理完第一个客人所有的点餐,烧菜任务后,才能去处理下一个客人的点餐,烧菜任务,这样显然把任务给串行化了,效率大大降低。
而现在我们就面临这样的问题:
目前整个秒杀的过程都是串行化执行的,并且这个流程里面涉及多次数据库查询操作,数据库查询是最耗费时间的,因此优化的思路就是把最耗费时间的数据库写操作转换为异步执行,然后把数据库查询操作通过redis查询替换掉,这样整体就分为了两部分,一部分是主线程去redis判断校验,然后如果判断和校验都通过了,就将消息放入一个队列中,异步线程从该队列中取出消息,然后去执行数据库写操作。
此时redis就相当于服务员,负责库存数量判断和重复购买校验,然后将合法的订单交易,放入队列中,异步处理线程,从队列读取消息,进行数据库写处理,即扣减库存,创建订单的耗时逻辑,全部异步完成。
显然,关于redis那部分判断逻辑,应该都由lua脚本来完成,而非java代码
1.新增优惠卷的同时,将优惠卷信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存优惠卷信息到Redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
}
测试:
2.lua脚本编写
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId
--3.脚本业务
--3.1判断库存是否充足 get storeKey
if(tonumber(redis.call('get',storeKey))<=0) then
--3.2库存不足,返回1
return 1
end
--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
--3.3存在,说明是重复下单,返回2
return 2
end
--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
return 0
3.修改抢购逻辑
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
long order = redisWorker.nextId("order");
//TODO:保存到阻塞队列
//4.返回订单id
return Result.ok(order);
}
}
当我们测试一下后:
此时数据库并无变化,因为我们还没把消息放入阻塞队列,从而通知异步线程去处理
4.异步线程处理阻塞队列中的消息
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 阻塞队列
*/
private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue(1024*1024);
/**
* 异步线程
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct
public void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
public class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
//1.获取队列中的订单信息
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
createVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("订单创建异常: ",e);
}
}
}
/**
* 保守起见,还会再次进行判断
*/
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//尝试获取分布式锁
// 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
//锁自动释放时间--默认30秒
//时间单位
boolean tryLock = lock.tryLock();
if(!tryLock){
log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
}
}finally {
lock.unlock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
log.error("库存扣减失败");
}
save(voucherOrder);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(uid);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);
//4.返回订单id
return Result.ok(orderId);
}
}
阻塞队列里面数据过多可能会导致jvm内存溢出,还有就是即便设置了阻塞队列最大元素个数上限也有弊端,就是如果元素过多,处理速度跟不上,会导致很多额外任务放入阻塞队列失败
还有就是数据都是存放在内存中的,一旦java程序出现异常,那么内存中的任务将会全部丢失,并且一旦出现异常,也会导致某个任务执行失败
默认是非阻塞的,并且如果阻塞时长传入0,表示无限等待
相信各位光看上面的介绍,应该对Stream还是一知半解,下面我来详细介绍一下它的用法:
XADD,命令用于在某个stream(流数据)中追加消息,演示如下:
127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1651325244694-0"
127.0.0.1:6379> XADD memberMessage * user zhong msg nihao
"1651325256282-0"
其中语法格式为:
XADD key ID field string [field string …]
需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。
ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。
field string [field string], 就是当前消息内容,由1个或多个key-value构成。
上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此
XREAD,从Stream中读取消息,演示如下:
127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
2) 1) 1) "1651325244694-0"
2) 1) "user"
2) "kang"
3) "msg"
4) "Hello"
2) 1) "1651325256282-0"
2) 1) "user"
2) "zhong"
3) "msg"
4) "nihao"
消息被读取后,并不会从stream队列中消失,这点需要注意
上面的命令是从消息队列memberMessage中读取所有消息。XREAD支持很多参数,语法格式为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
其中:
使用0表示从第一条消息开始
。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用 $ 表 示 最 新 的 消 息 I D
。 ( 在 非 阻 塞 模 式 下,表示最新的消息ID)。(在非阻塞模式下无意义)。XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
一个典型的阻塞模式用法为:
127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)
我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。
因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。
XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。较真来说,序号可能会溢出,but真可能吗?
可以通过multi批处理,来验证序号的递增:
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"
由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。
为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。
强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!
当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。
但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。
即消费者组模式可以让多个消费者协同合作,来共同消息队列中的消息,提高队列中消息的消费效率
消费者组模式的支持主要由两个命令实现:
进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:
# 生产者生成10条消息
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
1) "1553585533795-0"
2) "1553585533795-1"
3) "1553585533795-2"
4) "1553585533795-3"
5) "1553585533795-4"
# 创建消费组 mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK
# 消费者A,消费第1条
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
2) 1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
# 消费者A,消费第2条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消费者B,消费第3条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
# 消费者A,消费第4条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
# 消费者C,消费第5条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-4"
2) 1) "msg"
2) "5"
上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:
XGROUP CREATE mq mqGroup 0
,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
2) "3"
2) 1) "consumerB" # 消费者B有1个
2) "1"
3) 1) "consumerC" # 消费者C有1个
2) "1"
127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
2) "consumerA" # 消费者
3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 1654355
4) (integer) 4
# 共5个,余下3个省略 ...
127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
2) "consumerA"
3) (integer) 1641083
4) (integer) 5
# 共3个,余下2个省略 ...
每个Pending的消息有4个属性:
上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1
127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
2) "2"
2) 1) "consumerB"
2) "1"
3) 1) "consumerC"
2) "1"
127.0.0.1:6379>
有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。请继续。
消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 15907787
4) (integer) 4
# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerB"
3) (integer) 84404 # 注意IDLE,被重置了
4) (integer) 5 # 注意,读取次数也累加了1次
以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。请继续看:
正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:
# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
2) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!
Stream提供了XINFO来实现对服务器信息的监控,可以查询:
查看队列信息
127.0.0.1:6379> Xinfo stream mq
1) "length"
2) (integer) 7
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
2) 1) "msg"
2) "10"
消费组信息
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1553585533795-4"
消费者组成员信息
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 18949894
2) 1) "name"
2) "consumerB"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 3092719
3) 1) "name"
2) "consumerC"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 23683256
至此,消息队列的操作说明大体结束!
lua脚本改造
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--1.3订单id
local orderId=ARGV[3]
--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId
--3.脚本业务
--3.1判断库存是否充足 get storeKey
local storeNum=redis.call('get',storeKey)
--3.2 如果redis中没有该优惠卷库存记录,返回3
-- lua中只有false和nil是假值, 其他都是真值
if(storeNum) then
else
return 3
end
if(tonumber(storeNum)<=0) then
--3.2库存不足,返回1
return 1
end
--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
--3.3存在,说明是重复下单,返回2
return 2
end
--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
--3.6发送消息到队列,XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0
代码实现
package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import static com.hmdp.utils.RedisConstants.*;
/**
* <p>
* 服务实现类
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
/**
* 建议放到yml配置文件中
*/
private static final String LUA_FILE_PATH="seckill.lua";
private static final String ORDER = "order";
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource(LUA_FILE_PATH));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 异步线程
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct
public void init(){
//如果指定stream队列关联的消费者组已经存在,则不进行处理
if (!targetGroupExistsInStream(STREAM_QUEUE_NAME,STREAM_GROUP_NAME)) {
//这里createGroup的mkStream为true,表示在创建消费者组时,如果关联的stream队列不存在,也会自动创建
stringRedisTemplate.opsForStream().createGroup(STREAM_QUEUE_NAME,ReadOffset.from("0"),STREAM_GROUP_NAME);
}
//异步监听任务执行
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private boolean targetGroupExistsInStream(String streamName,String groupName) {
//如果stream队列不存在
if(stringRedisTemplate.countExistingKeys(Collections.singletonList(streamName))==0){
return false;
}
StreamInfo.XInfoGroups order_stream_groups = stringRedisTemplate.opsForStream().groups(streamName);
Iterator<StreamInfo.XInfoGroup> iterator = order_stream_groups.iterator();
while(iterator.hasNext()){
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if(xInfoGroup.groupName().equals(groupName)){
return true;
}
}
return false;
}
public class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
//1.获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//消费组的名字和消费者的名字
Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
StreamReadOptions.empty().count(1)
//我这里设置为一直阻塞,直到有消息可读为止
.block(Duration.ofSeconds(0)),
//从哪个Stream队列进行消息读取,此处读取方式为>
StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.lastConsumed())
);
if (handleMsgFromStream(list)) {
continue;
}
} catch (Exception e) {
log.error("处理消息异常");
//处理Pending队列中消息
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
StreamReadOptions.empty().count(1),
//从头开始消费pending队列中所有消息
StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.from("0"))
);
if(handleMsgFromStream(list)){
//Pending队列中没有错误消息,那么直接退出循环
break;
}
} catch (Exception e) {
//处理Pending队列中的异常消息
//可以在这里做一些异常记录等
try {
//避免循环频率过高
Thread.sleep(3000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private boolean handleMsgFromStream(List<MapRecord<String, Object, Object>> list) {
//2.判断订单信息是否为空
if(list ==null || list.isEmpty()){
//如果为null,说明没有消息
return true;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.创建订单
createVoucherOrder(voucherOrder);
//4.确认消息
stringRedisTemplate.opsForStream().acknowledge(STREAM_QUEUE_NAME,STREAM_GROUP_NAME,record.getId());
return false;
}
/**
* 保守起见,还会再次进行判断
*/
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//创建锁对象
RLock lock = redissonClient.getLock(LOCK_ORDER_KEY + userId);
//尝试获取分布式锁
// 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
//锁自动释放时间--默认30秒
//时间单位
boolean tryLock = lock.tryLock();
if(!tryLock){
log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
return;
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
return;
}
}finally {
lock.unlock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
log.error("库存扣减失败");
return;
}
save(voucherOrder);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
Long orderId = redisWorker.nextId(ORDER);
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString(),orderId.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
//说明redis中没有当前优惠卷的库存记录
//去数据库查询该优惠卷是否存在,如果不存在,说明是恶意访问
//如果存在,就更新redis记录
if(r==3){
if(handleUnKnownVoucherId(voucherId)){
seckillVoucher(voucherId);
}else {
return Result.fail("指定优惠卷不存在");
}
}
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(uid);
voucherOrder.setVoucherId(voucherId);
//4.返回订单id
return Result.ok(orderId);
}
private Boolean handleUnKnownVoucherId(Long voucherId) {
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if(voucher==null){
return false;
}
//更新redis记录
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucherId,voucher.getStock().toString());
return true;
}
}
本节最重要的地方还是在redis的Stream消息队列,并且也花了大量篇幅去讲解加实践,当然一切还是以官方文档为主,因此建议大家可以没事去看看redis的stream部分文档
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
介绍篇 什么是MiniApis? MiniApis的特点和优势 MiniApis的应用场景 环境搭建 系统要求 安装MiniApis 配置开发环境 基础概念 MiniApis架构概述
我正在从“JavaScript 圣经”一书中学习 javascript,但我遇到了一些困难。我试图理解这段代码: function checkIt(evt) { evt = (evt) ? e
package com.fastone.www.javademo.stringintern; /** * * String.intern()是一个Native方法, * 它的作用是:如果字
您会推荐哪些资源来学习 AppleScript。我使用具有 Objective-C 背景的传统 C/C++。 我也在寻找有关如何更好地开发和从脚本编辑器获取更快文档的技巧。示例提示是“查找要编写脚本的
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve thi
关闭。这个问题不符合 Stack Overflow guidelines 。它目前不接受答案。 想改善这个问题吗?更新问题,以便堆栈溢出为 on-topic。 6年前关闭。 Improve this
我是塞内加尔的阿里。我今年60岁(也许这是我真正的问题-笑脸!!!)。 我正在学习Flutter和Dart。今天,我想使用给定数据模型的列表(它的名称是Mortalite,请参见下面的代码)。 我尝试
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 9年前关闭。 Improve this que
学习 Cappuccino 的最佳来源是什么?我从事“传统”网络开发,但我对这个新框架非常感兴趣。请注意,我对 Objective-C 毫无了解。 最佳答案 如上所述,该网站是一个好地方,但还有一些其
我正在学习如何使用 hashMap,有人可以检查我编写的这段代码并告诉我它是否正确吗?这个想法是有一个在公司工作的员工列表,我想从 hashMap 添加和删除员工。 public class Staf
我正在尝试将 jQuery 与 CoffeScript 一起使用。我按照博客中的说明操作,指示使用 $ -> 或 jQuery -> 而不是 .ready() 。我玩了一下代码,但我似乎无法理解我出错
还在学习,还有很多问题,所以这里有一些。我正在进行 javascript -> PHP 转换,并希望确保这些做法是正确的。是$dailyparams->$calories = $calories;一条
我目前正在学习 SQL,以便从我们的 Magento 数据库制作一个简单的 RFM 报告,我目前可以通过导出两个查询并将它们粘贴到 Excel 模板中来完成此操作,我想摆脱 Excel 模板。 我认为
我知道我很可能会因为这个问题而受到抨击,但没有人问,我求助于你。这是否是一个正确的 javascript > php 转换 - 在我开始不良做法之前,我想知道这是否是解决此问题的正确方法。 JavaS
除了 Ruby-Doc 之外,哪些来源最适合获取一些示例和教程,尤其是关于 Ruby 中的 Tk/Tile?我发现自己更正常了 http://www.tutorialspoint.com/ruby/r
我只在第一次收到警告。这正常吗? >>> cv=LassoCV(cv=10).fit(x,y) C:\Python27\lib\site-packages\scikit_learn-0.14.1-py
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
我是一名优秀的程序员,十分优秀!