- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
借助redis Stream 数据结构实现消息队列,异步完成订单创建,其中涉及到了缓存(击穿,穿透,雪崩),锁(Redisson),并发处理,异步处理,Lua脚本 。
IDE:IDEA 2022 。
。
。
在完成功能之前 ,需要了解一下redis 中有关stream 数据结构相关的命令 。
XACK:确认消息已经处理,redis 会在PEL(pending entries List )中移除一个或多个消息。一般情况下 一个消息被 XREADGROUP 或 XCLAIM之后会被写入PEL.
XADD: 把消息(Entry ,key-value)追加到队列,默认如果队列不存在会创建,除非使用 NOMKSTREAM ,之后 可以通过XREAD ,XREANGE 等命令读取或通过XDEL,XTRIM移除消息 。
XCLAIM,XAUTOCLAIM 改变PEL 中的消息的所有者 。
XDEL:在队列中移除一个或多个消息(entry) 。
XGROUP CREATE :在指定的队列中创建一个消费者组,队列key有且仅能有一个,否则重复会提示:-BUSYGROUP 不存在会提示:ERR no such key ,可以通过选项 MKSTREAM 在不存在时创建 。
XGROUP CREATECONSUMER:在一个给定的队列和消费者组中创建一个消费者,不能重复。此外在任何使用到消费者的命令中,如果不存在则自动创建。如:XREADGROUP 。
XGROUP DELCONSUMER:在一个给定的队列和消费者组中移除一个消费者 在执行此命令之前 相关的PEL 要先执行 XCLAIM 或 XACK进行处理,否则将变得 unclaimable. 。
XGROUP DESTROY:在给定的队列中删除一个消费者组,相关的consumers 和 PEL 都会被 删除,所以执行之前要慎重 。
XGROUP SETID :重新设置指定的消费者组的最后一个处理的消息的ID,通常这个ID初始值 是XGROUP CREATE时指定的,每次执行XREDGROUP 也会修改(更新)这个ID, 。
例如:想让组内的消费者重新处理队列中的所有的entrys时,可以 XGROUP SETID streamkey groupkey 0,在redis 7.0增加了参数 ENTRIESREAD n ,n 为已读数量 , 。
此时,xinfo groups streamKey 可以看到组信息如下:
127.0 . 0.1 : 6388 > xread count 20 streams s1 0 1 ) 1 ) " s1 " 2 ) 1 ) 1 ) " 1678852071712-0 " 2 ) 1 ) " key1 " 2 ) " vlaue1 " 2 ) 1 ) " 1678852073882-0 " 2 ) 1 ) " key1 " 2 ) " vlaue1 " 3 ) 1 ) " 1678852080406-0 " 2 ) 1 ) " key2 " 2 ) " vlaue2 " 4 ) 1 ) " 1678852588261-0 " 2 ) 1 ) " key-1 " 2 ) " vlaue2 " 5 ) 1 ) " 1678852591957-0 " 2 ) 1 ) " key-2 " 2 ) " vlaue2 " 6 ) 1 ) " 1678852595467-0 " 2 ) 1 ) " key-3 " 2 ) " vlaue2 " 7 ) 1 ) " 1678852599576-0 " 2 ) 1 ) " key-4 " 2 ) " vlaue2 " 8 ) 1 ) " 1678852616566-0 " 2 ) 1 ) " key-4 " 2 ) " vlaue2 " 9 ) 1 ) " 1678852946989-0 " 2 ) 1 ) " key-5 " 2 ) " vlaue2 " 127.0 . 0.1 : 6388 > xinfo groups s1 1 ) 1 ) " name " 2 ) " g1 " 3 ) " consumers " 4 ) (integer) 1 5 ) " pending " 6 ) (integer) 5 7 ) " last-delivered-id " 8 ) " 1678852080406-0 " 9 ) " entries-read " 10 ) (integer) 9 11 ) " lag " 12 ) (integer) 0 127.0 . 0.1 : 6388 > xreadgroup group g1 lihui count 1 streams s1 > 1 ) 1 ) " s1 " 2 ) 1 ) 1 ) " 1678852588261-0 " 2 ) 1 ) " key-1 " 2 ) " vlaue2 " 127.0 . 0.1 : 6388 > xinfo groups s1 1 ) 1 ) " name " 2 ) " g1 " 3 ) " consumers " 4 ) (integer) 1 5 ) " pending " 6 ) (integer) 5 7 ) " last-delivered-id " 8 ) " 1678852588261-0 " 9 ) " entries-read " 10 ) (integer) 10 11 ) " lag " 12 ) (integer) - 1 127.0 . 0.1 : 6388 > xlen s1 (integer) 9 127.0 . 0.1 : 6388 > xgroup setid s1 g1 1678852073882 - 0 ENTRIESREAD 2 OK 127.0 . 0.1 : 6388 > xinfo groups s1 1 ) 1 ) " name " 2 ) " g1 " 3 ) " consumers " 4 ) (integer) 1 5 ) " pending " 6 ) (integer) 5 7 ) " last-delivered-id " 8 ) " 1678852073882-0 " 9 ) " entries-read " 10 ) (integer) 2 11 ) " lag " 12 ) (integer) 7
XINFO [SRTREAM,GROUPS,CONSUMERS] :获取队列,组,消费者的信息 。
XLEN:返回 stream中entrys的数量 。
XPENDING :查询通过消费者组读取但未被确认的entrys 。
XRANGE :返回队列中的某个区间内的entrys 。
XREVRANGE: 顺序说XRANGE相反的读取队列中的entry 。
XREAD:从队列 内读取一个或多个entry ,支持block 。
XREADGROUP:在XREAD的基础上在队列与消费者之间增加了组的概念 。
XSETID:是一个redis 内部命令,用于记录master 与replicate之间的数据 同步的最后一个ID 的记录 。
。
好了,下面是是代码 。
Lua 脚本 完成 相关功能 。
--[[ 判断优惠券是否充足 优惠券 id ARGV[2] ,key KEYS[1] 判断当前用户是否已经下单 用户id ARGV[1] 如果库存充足,且用户没有下单 则 1、扣库存 2、保存用户下单信息 订单key KEYS[2] 订单ID ARGV[3] stream 队列 key KEYS[3] consumer group key KEYS[4] 利用redis 中set 数据 类型的不重复性,进行重复下单信息的记录 -- ]] local sec_kc=redis.call( ' get ' ,KEYS[ 1 ]) -- if(not sec_kc) then -- redis.call('set',KEYS[1],2000) -- sec_kc=2000 -- end if ( not sec_kc or sec_kc== ' {} ' ) then -- 库存 不足 return 3 end if ( tonumber (sec_kc)<= 0 ) then -- 库存 不足 return 1 end -- 完成一人一单重复检测 if (redis.call( ' SISMEMBER ' ,KEYS[ 2 ],ARGV[ 1 ])== 1 ) then -- 订单重复 return 2 end -- 扣减库存 redis.call( ' incrby ' ,KEYS[ 1 ],- 1 ) -- 记录订单与用户之间关系 set 数据 redis.call( ' sadd ' ,KEYS[ 2 ],ARGV[ 1 ]) -- 获取stream队列中entry数量 --[=[ local streamcount=redis.call('xlen',KEYS[3]) --此功能转到java 中完成 CreateStreamAndGroup if(streamcount==0) then--没有stream 需要创建 -- XGROUP CREATE stream.order group1 0 mkstream redis.call('XGROUP','CREATE',KEYS[3],KEYS[4],'0','mkstream') end -- ]=] -- 判断队列是否存在 local streamExists=redis.call( ' exists ' ,KEYS[ 3 ]) if (streamExists== 0 ) then return 4 -- 队列不存在 end -- 创建消息entry -- XADD S1 * KEY12 VALUE12 redis.call( ' XADD ' ,KEYS[ 3 ], ' * ' , ' userId ' ,ARGV[ 1 ], ' voucherId ' ,ARGV[ 2 ], ' id ' ,ARGV[ 3 ]) -- key 的取值与对象 VoucherOrder 中变量属性对应,分别是,userId,voucherId,id(订单id) -- 满足下单条件 return 0
。
因为在redis 的stream 命令中 没有判断 consumer group 是否存在,所以需要变通的方法,解决这个问题,因为如果组不存的情况下,进行xadd 会报错.
创建队列和消费者组 。
/** * 查询队列信息,如果不存在会出现异常,在异常中创建队列 * @param streamKey * @param groupKey */ private void CreateStreamAndGroup(String streamKey,String groupKey) { try { // 默认队列和组不存 创建队列 stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0" ), groupKey); System.out.println(streamKey + "队列" + streamKey + "和组" + groupKey + "创建成功。" ); } catch ( Exception ex) { String errx = ex.getMessage().toString(); if (errx.indexOf("BUSYGROUP Consumer Group name already exists")>0) // 队列与组都已经存在 { // System.out.println(streamKey + "队列" + streamKey + "和组" + groupKey + "都已经存在"); } else { log.debug(ex.getMessage()); } } }
判断队列是否存在 。
/** * 判断consumer group 是否存在,如果队列不存在直接返回 false, * 如果队列存在,不论组是否存,都直接创建。 */ private boolean ConsumerGroupExists(String streamKey,String groupKey) { boolean isok= false ; try { // 默认队列和组不存 创建队列 Collection<String> streamCollect= new ArrayList<> (); streamCollect.add(streamKey); Long c_stream = stringRedisTemplate.countExistingKeys(streamCollect); if (c_stream==0 ) { isok = false ; // 队列不存在 return isok; } // 队列存在 ,那么直接创建消费者组, stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0" ), groupKey); System.out.println(streamKey + "队列" + streamKey + "存在,组" + groupKey + "创建成功。" ); isok = true ; // 组创建成功 } catch ( Exception ex) { String errx = ex.getMessage().toString(); if (errx.indexOf("BUSYGROUP Consumer Group name already exists")>0) // 队列与组都已经存在 { isok = true ; // 组创建成功 // System.out.println(streamKey + "队列" + streamKey + "和组" + groupKey + "都已经存在"); } else { isok = false ; // 未知异常 log.debug(ex.getMessage()); } } finally { return isok; } }
。
订单创建入口函数(放在controller 或 seervice impl) 中 。
public Result secKillVoucher(Long voucherId) { long userID = UserHolder.getUser().getId(); String vouchStockKey = RedisConstants.SECKILL_VOUCHERSTOCK_CACHE_KEY + voucherId; String userid_s = String.valueOf(userID); String vouchOrderKey = RedisConstants.SECKILL_ORDER_CACHE_KEY + voucherId; String streamKey = RedisConstants.REDIS_STREAM_QUEUE_ORDER ; String groupKey = RedisConstants.REDIS_STREAM_GROUP_ORDER; List <String> keys = new ArrayList<> (); keys.add(vouchStockKey); // 库存key KEYS[1] keys.add(vouchOrderKey); // 订单key KEYS[2] keys.add(streamKey); // 队列key KEYS[3] keys.add(groupKey); // 消费组key KEYS[4] // check sec_voucher_stock // 1 查询优惠券 int count = seckillVoucherService.getRedisStock(voucherId); if (count <= 0 ) { return Result.fail("优惠券库存不存在 count:" + count); } long orderid = redisIdWorker.nextId(RedisConstants.ID_ORDER); CreateStreamAndGroup(streamKey,groupKey); // 订单ID ARGV[3] // 优惠券 id ARGV[2] // 用户id ARGV[1] // 执行lua 完成优惠券库存,一人一单,检查 并创建用户与优惠券ID之间的关联 // 脚本完成 创建消息队列 (执行此脚本之前确保队列 已经创建 Long sekResult = stringRedisTemplate.execute(SECKILL_SCRIPT, keys , userid_s,String.valueOf(voucherId),String.valueOf(orderid) ); int ri = sekResult.intValue(); System.out.println( "lua 脚本执行返回值 :"+ ri); if (ri!=0 ) { String errtip="优惠券库存不足" ; switch (ri) { case 2 : errtip ="订单重复一人一单" ; break ; case 3 : errtip ="库存数据不存在" ; break ; case 4 : errtip ="消息队列不存在" ; break ; case 1 : errtip ="优惠券库存不足" ; break ; default : errtip ="未知错误" ; } return Result.fail(errtip); } /** 注意:下面的代理对象要有 */ proxy = (IVoucherOrderService)AopContext.currentProxy(); return Result.ok(orderid); }
。
创建线程获取消息队列并处理 。
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor(); @PostConstruct private void init(){ // 本地阻塞队列的方式 // SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); // 读取redis消息队列并处理 SECKILL_ORDER_EXECUTOR.submit( new VoucherOrderHandlerRedisQueue()); } // 从redis 消息队列中获取消息 private class VoucherOrderHandlerRedisQueue implements Runnable{ @Override public void run() { while ( true ){ // get orderinfo from blockingqueen try { String streamKey = RedisConstants.REDIS_STREAM_QUEUE_ORDER ; String groupKey = RedisConstants.REDIS_STREAM_GROUP_ORDER; String cs = RedisConstants.REDIS_STREAM_CONSUMER_ORDER; // 判断组是否存在 if (! ConsumerGroupExists(streamKey,groupKey)) { // log.error("异步线程读取redis stream 失败:队列 未创建:"+streamKey); Thread.sleep(3000 ); continue ; } // 从消息队列中获取消息 List<MapRecord<String, Object, Object>> queueList = stringRedisTemplate.opsForStream() .read(Consumer.from( groupKey,cs), StreamReadOptions.empty().count( 1).block(Duration.ofSeconds(2 )) , StreamOffset.create(streamKey,ReadOffset.lastConsumed()) ); if (queueList== null || queueList.isEmpty() ) { // 未获取到消息 continue ; // continue do next } // 处理消息 MapRecord<String, Object, Object> record = queueList.get(0 ); Map <Object, Object> vq = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(vq, new VoucherOrder(), false ); // 成功获取 创建订单到数据库 handlerVoucherOrder(voucherOrder); // 确认消息 在PEL中移除 stringRedisTemplate.opsForStream().acknowledge(streamKey,groupKey,record.getId()); } catch (Exception e) { HandlePendingList(); log.error(e.getMessage().toString()); // throw new RuntimeException(e); } } }
异常处理 在redis stream pending List 中获取 队列 。
private void HandlePendingList() { String streamKey = RedisConstants.REDIS_STREAM_QUEUE_ORDER ; // stream String groupKey = RedisConstants.REDIS_STREAM_GROUP_ORDER; // consumer group String cs=RedisConstants.REDIS_STREAM_CONSUMER_ORDER; // consumer while ( true ) try { { // read from pel List<MapRecord<String, Object, Object>> queueList = stringRedisTemplate.opsForStream() .read(Consumer.from ( groupKey,cs), StreamReadOptions.empty().count( 1 ) , StreamOffset.create(streamKey, ReadOffset.from( "0" )) ); if (queueList== null || queueList.isEmpty()) { // 未获取到消息 break ; // continue do next normal } // 处理消息 MapRecord<String, Object, Object> record = queueList.get(0 ); Map <Object, Object> vq = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(vq, new VoucherOrder(), false ); // 成功获取 handlerVoucherOrder(voucherOrder); // 确认消息 stringRedisTemplate.opsForStream().acknowledge(streamKey,groupKey,record.getId()); } } catch (Exception e) { log.debug( "消息队列--peding List 处理异常" ); try { Thread.sleep( 50 ); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } } }
。
订单处理 。
private IVoucherOrderService proxy; private void handlerVoucherOrder(VoucherOrder voucherOrder) { if (voucherOrder== null || voucherOrder.getUserId()== null ) { log.debug( "对象为空 或 属性用户ID 为空。" ); } // user ID long userID= voucherOrder.getUserId(); RLock lock =redisson.getLock(RedisConstants.LOCK_VOUCHERORDER_KEY+ StrUtil.toString(userID)); boolean islock = lock.tryLock(); // active watch dog if (! islock) { log.error( "锁创建失败" ); } try { // IVoucherOrderService proxy=(IVoucherOrderService)AopContext.currentProxy(); // 获取spring 对当前对象的代理 proxy.createVoucherOrderByObj(voucherOrder); } catch (IllegalStateException e) { throw new RuntimeException(e); } finally { // lock.unLock(); // my define simple redis lock lock.unlock(); // redisson release lock } }
实现类中的方法 。
。
@Transactional // 因为方法中 订单的操作 和 库存扣减,所以增加事务支持 为防止数据 不同步 public void createVoucherOrderByObj(VoucherOrder voucher) { // user ID if (voucher== null || voucher.getUserId()== null ) { log.debug( "对象为空 或 属性用户ID 为空。" ); return ; } long userID= voucher.getUserId(); int count=query().eq("user_id",userID).eq("voucher_id" , voucher.getVoucherId()).count(); if (count>0 ) { log.error( "优惠券仅限每人一个 count:"+ count); return ; } // 5扣减库存 boolean success= seckillVoucherService.update() .setSql( "stock=stock-1" ) .eq( "voucher_id", voucher.getVoucherId()).gt("stock",0 ) .update(); if (! success) { log.error( "扣减券库存失败Obj: voucher.getVoucherId():"+ voucher.getVoucherId()); return ; } System.out.println( "voucher saved" ); boolean ds = save(voucher); }
。
最后此篇关于Java+Redis通过Lua完成库存扣减,创建消息队列,异步处理消息--实战的文章就讲到这里了,如果你想了解更多关于Java+Redis通过Lua完成库存扣减,创建消息队列,异步处理消息--实战的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我是一名优秀的程序员,十分优秀!