gpt4 book ai didi

Java+Redis通过Lua完成库存扣减,创建消息队列,异步处理消息--实战

转载 作者:我是一只小鸟 更新时间:2023-03-15 22:31:43 26 4
gpt4 key购买 nike

需要完成功能

借助redis Stream 数据结构实现消息队列,异步完成订单创建,其中涉及到了缓存(击穿,穿透,雪崩),锁(Redisson),并发处理,异步处理,Lua脚本 。

IDE:IDEA 2022 。

  。

1、读取库存数据 【Lua】  

2、判断库存 【Lua】

3、扣减库存 【Lua】

4、创建队列和组 【Java】

5、发送队列消息 【Lua】

6、消息读取并处理 【Java】

  。

在完成功能之前 ,需要了解一下redis 中有关stream 数据结构相关的命令 。

XACK:确认消息已经处理,redis 会在PEL(pending entries List )中移除一个或多个消息。一般情况下 一个消息被 XREADGROUP 或 XCLAIM之后会被写入PEL.

XADD: 把消息(Entry  ,key-value)追加到队列,默认如果队列不存在会创建,除非使用 NOMKSTREAM ,之后 可以通过XREAD ,XREANGE 等命令读取或通过XDEL,XTRIM移除消息 。

XCLAIM,XAUTOCLAIM 改变PEL 中的消息的所有者 。

XDEL:在队列中移除一个或多个消息(entry) 。

XGROUP CREATE :在指定的队列中创建一个消费者组,队列key有且仅能有一个,否则重复会提示:-BUSYGROUP 不存在会提示:ERR no such key ,可以通过选项 MKSTREAM 在不存在时创建 。

XGROUP CREATECONSUMER:在一个给定的队列和消费者组中创建一个消费者,不能重复。此外在任何使用到消费者的命令中,如果不存在则自动创建。如:XREADGROUP 。

XGROUP DELCONSUMER:在一个给定的队列和消费者组中移除一个消费者  在执行此命令之前 相关的PEL 要先执行 XCLAIM 或 XACK进行处理,否则将变得 unclaimable. 。

XGROUP DESTROY:在给定的队列中删除一个消费者组,相关的consumers 和 PEL 都会被 删除,所以执行之前要慎重 。

XGROUP SETID :重新设置指定的消费者组的最后一个处理的消息的ID,通常这个ID初始值 是XGROUP CREATE时指定的,每次执行XREDGROUP 也会修改(更新)这个ID, 。

        例如:想让组内的消费者重新处理队列中的所有的entrys时,可以 XGROUP SETID streamkey groupkey 0,在redis 7.0增加了参数 ENTRIESREAD n ,n 为已读数量 ,     。

        此时,xinfo groups streamKey 可以看到组信息如下:

                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xread count 
                  
                    20
                  
                   streams s1 
                  
                    0
                  
                  
                    1
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    s1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852071712-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852073882-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue1
                  
                  
                    "
                  
                  
                    3
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852080406-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key2
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    4
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852588261-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    5
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852591957-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-2
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    6
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852595467-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-3
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    7
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852599576-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-4
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    8
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852616566-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-4
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    9
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852946989-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-5
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xinfo 
                  
                    groups
                  
                  
                     s1

                  
                  
                    1
                  
                  )  
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    name
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    g1
                  
                  
                    "
                  
                  
                    3
                  
                  ) 
                  
                    "
                  
                  
                    consumers
                  
                  
                    "
                  
                  
                    4
                  
                  ) (integer) 
                  
                    1
                  
                  
                    5
                  
                  ) 
                  
                    "
                  
                  
                    pending
                  
                  
                    "
                  
                  
                    6
                  
                  ) (integer) 
                  
                    5
                  
                  
                    7
                  
                  ) 
                  
                    "
                  
                  
                    last-delivered-id
                  
                  
                    "
                  
                  
                    8
                  
                  ) 
                  
                    "
                  
                  
                    1678852080406-0
                  
                  
                    "
                  
                  
                    9
                  
                  ) 
                  
                    "
                  
                  
                    entries-read
                  
                  
                    "
                  
                  
                    10
                  
                  ) (integer) 
                  
                    9
                  
                  
                    11
                  
                  ) 
                  
                    "
                  
                  
                    lag
                  
                  
                    "
                  
                  
                    12
                  
                  ) (integer) 
                  
                    0
                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xreadgroup group g1 lihui  count 
                  
                    1
                  
                   streams s1 >

                  
                    1
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    s1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    1678852588261-0
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    key-1
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    vlaue2
                  
                  
                    "
                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xinfo 
                  
                    groups
                  
                  
                     s1

                  
                  
                    1
                  
                  )  
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    name
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    g1
                  
                  
                    "
                  
                  
                    3
                  
                  ) 
                  
                    "
                  
                  
                    consumers
                  
                  
                    "
                  
                  
                    4
                  
                  ) (integer) 
                  
                    1
                  
                  
                    5
                  
                  ) 
                  
                    "
                  
                  
                    pending
                  
                  
                    "
                  
                  
                    6
                  
                  ) (integer) 
                  
                    5
                  
                  
                    7
                  
                  ) 
                  
                    "
                  
                  
                    last-delivered-id
                  
                  
                    "
                  
                  
                    8
                  
                  ) 
                  
                    "
                  
                  
                    1678852588261-0
                  
                  
                    "
                  
                  
                    9
                  
                  ) 
                  
                    "
                  
                  
                    entries-read
                  
                  
                    "
                  
                  
                    10
                  
                  ) (integer) 
                  
                    10
                  
                  
                    11
                  
                  ) 
                  
                    "
                  
                  
                    lag
                  
                  
                    "
                  
                  
                    12
                  
                  ) (integer) -
                  
                    1
                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  >
                  
                     xlen s1
(integer) 
                  
                  
                    9
                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xgroup setid s1 g1 
                  
                    1678852073882
                  
                  -
                  
                    0
                  
                    ENTRIESREAD 
                  
                    2
                  
                  
                    
OK

                  
                  
                    127.0
                  
                  .
                  
                    0.1
                  
                  :
                  
                    6388
                  
                  > xinfo 
                  
                    groups
                  
                  
                     s1

                  
                  
                    1
                  
                  )  
                  
                    1
                  
                  ) 
                  
                    "
                  
                  
                    name
                  
                  
                    "
                  
                  
                    2
                  
                  ) 
                  
                    "
                  
                  
                    g1
                  
                  
                    "
                  
                  
                    3
                  
                  ) 
                  
                    "
                  
                  
                    consumers
                  
                  
                    "
                  
                  
                    4
                  
                  ) (integer) 
                  
                    1
                  
                  
                    5
                  
                  ) 
                  
                    "
                  
                  
                    pending
                  
                  
                    "
                  
                  
                    6
                  
                  ) (integer) 
                  
                    5
                  
                  
                    7
                  
                  ) 
                  
                    "
                  
                  
                    last-delivered-id
                  
                  
                    "
                  
                  
                    8
                  
                  ) 
                  
                    "
                  
                  
                    1678852073882-0
                  
                  
                    "
                  
                  
                    9
                  
                  ) 
                  
                    "
                  
                  
                    entries-read
                  
                  
                    "
                  
                  
                    10
                  
                  ) (integer) 
                  
                    2
                  
                  
                    11
                  
                  ) 
                  
                    "
                  
                  
                    lag
                  
                  
                    "
                  
                  
                    12
                  
                  ) (integer) 
                  
                    7
                  
                
View Code

XINFO [SRTREAM,GROUPS,CONSUMERS] :获取队列,组,消费者的信息 。

XLEN:返回 stream中entrys的数量 。

XPENDING :查询通过消费者组读取但未被确认的entrys 。

XRANGE :返回队列中的某个区间内的entrys 。

XREVRANGE: 顺序说XRANGE相反的读取队列中的entry 。

XREAD:从队列 内读取一个或多个entry ,支持block  。

XREADGROUP:在XREAD的基础上在队列与消费者之间增加了组的概念 。

XSETID:是一个redis 内部命令,用于记录master 与replicate之间的数据 同步的最后一个ID 的记录 。

  。

好了,下面是是代码 。

Lua 脚本 完成 相关功能 。

                
                  --[[
                
                
                  
 判断优惠券是否充足  优惠券  id ARGV[2] ,key KEYS[1]
 判断当前用户是否已经下单 用户id ARGV[1]
 如果库存充足,且用户没有下单 则 1、扣库存 2、保存用户下单信息  订单key KEYS[2] 订单ID ARGV[3]
stream 队列 key  KEYS[3]
consumer group key   KEYS[4]
  利用redis 中set 数据 类型的不重复性,进行重复下单信息的记录
--
                
                
                  ]]
                
                
                  local
                
                 sec_kc=redis.call(
                
                  '
                
                
                  get
                
                
                  '
                
                ,KEYS[
                
                  1
                
                
                  ])

                
                
                  --
                
                
                  if(not sec_kc) then
                
                
                  
--
                
                
                      redis.call('set',KEYS[1],2000)
                
                
                  
--
                
                
                      sec_kc=2000
                
                
                  
--
                
                
                  end
                
                
                  if
                
                (
                
                  not
                
                 sec_kc 
                
                  or
                
                  sec_kc==
                
                  '
                
                
                  {}
                
                
                  '
                
                ) 
                
                  then
                
                
                  --
                
                
                  库存 不足
                
                
                  return
                
                
                  3
                
                
                  end
                
                
                  if
                
                (  
                
                  tonumber
                
                (sec_kc)<=
                
                  0
                
                ) 
                
                  then
                
                
                  --
                
                
                  库存 不足
                
                
                  return
                
                
                  1
                
                
                  end
                
                
                  --
                
                
                  完成一人一单重复检测
                
                
                  if
                
                (redis.call(
                
                  '
                
                
                  SISMEMBER
                
                
                  '
                
                ,KEYS[
                
                  2
                
                ],ARGV[
                
                  1
                
                ])==
                
                  1
                
                ) 
                
                  then
                
                
                  --
                
                
                  订单重复
                
                
                  return
                
                
                  2
                
                
                  end
                
                
                  --
                
                
                  扣减库存
                
                
redis.call(
                
                  '
                
                
                  incrby
                
                
                  '
                
                ,KEYS[
                
                  1
                
                ],-
                
                  1
                
                
                  )

                
                
                  --
                
                
                   记录订单与用户之间关系 set 数据
                
                
redis.call(
                
                  '
                
                
                  sadd
                
                
                  '
                
                ,KEYS[
                
                  2
                
                ],ARGV[
                
                  1
                
                
                  ])

                
                
                  --
                
                
                  获取stream队列中entry数量
                
                
                  --[=[
                
                
                  
local streamcount=redis.call('xlen',KEYS[3])
--此功能转到java 中完成 CreateStreamAndGroup
if(streamcount==0) then--没有stream 需要创建
    -- XGROUP CREATE stream.order group1 0 mkstream
    redis.call('XGROUP','CREATE',KEYS[3],KEYS[4],'0','mkstream')

end
--
                
                
                  ]=]
                
                
                  --
                
                
                  判断队列是否存在
                
                
                  local
                
                 streamExists=redis.call(
                
                  '
                
                
                  exists
                
                
                  '
                
                ,KEYS[
                
                  3
                
                
                  ])

                
                
                  if
                
                (streamExists==
                
                  0
                
                ) 
                
                  then
                
                
                  return
                
                
                  4
                
                
                  --
                
                
                  队列不存在
                
                
                  end
                
                
                  --
                
                
                   创建消息entry
                
                
                  
--
                
                
                  XADD S1 * KEY12 VALUE12
                
                
redis.call(
                
                  '
                
                
                  XADD
                
                
                  '
                
                ,KEYS[
                
                  3
                
                ],
                
                  '
                
                
                  *
                
                
                  '
                
                ,
                
                  '
                
                
                  userId
                
                
                  '
                
                ,ARGV[
                
                  1
                
                ],
                
                  '
                
                
                  voucherId
                
                
                  '
                
                ,ARGV[
                
                  2
                
                ],
                
                  '
                
                
                  id
                
                
                  '
                
                ,ARGV[
                
                  3
                
                ])
                
                  --
                
                
                  key 的取值与对象 VoucherOrder 中变量属性对应,分别是,userId,voucherId,id(订单id)
                
                
                  
--
                
                
                  满足下单条件
                
                
                  return
                
                
                  0
                
              

  。

因为在redis 的stream 命令中 没有判断 consumer group 是否存在,所以需要变通的方法,解决这个问题,因为如果组不存的情况下,进行xadd 会报错.

创建队列和消费者组 。

                
                  /**
                
                
                  
     * 查询队列信息,如果不存在会出现异常,在异常中创建队列
     * 
                
                
                  @param
                
                
                   streamKey
     * 
                
                
                  @param
                
                
                   groupKey
     
                
                
                  */
                
                
                  private
                
                
                  void
                
                
                   CreateStreamAndGroup(String streamKey,String groupKey)
    {
        
                
                
                  try
                
                
                   {
            
                
                
                  //
                
                
                  默认队列和组不存 创建队列
                
                
            stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"
                
                  ), groupKey);
            System.out.println(streamKey 
                
                + "队列" + streamKey + "和组" + groupKey + "创建成功。"
                
                  );
        }

        
                
                
                  catch
                
                
                   ( Exception ex)
        {
            String errx 
                
                =
                
                   ex.getMessage().toString();

            
                
                
                  if
                
                (errx.indexOf("BUSYGROUP Consumer Group name already exists")>0)
                
                  //
                
                
                  队列与组都已经存在
                
                
                              {
                
                
                
                  //
                
                
                  System.out.println(streamKey + "队列" + streamKey + "和组" + groupKey + "都已经存在");
                
                
                              }
            
                
                
                  else
                
                
                  
            {
                log.debug(ex.getMessage());
            }
        }


    }
                
              

 判断队列是否存在  。

                
                  /**
                
                
                  
     * 判断consumer group 是否存在,如果队列不存在直接返回 false,
     * 如果队列存在,不论组是否存,都直接创建。
     
                
                
                  */
                
                
                  private
                
                
                  boolean
                
                
                   ConsumerGroupExists(String streamKey,String groupKey)
    {
        
                
                
                  boolean
                
                 isok=
                
                  false
                
                
                  ;
        
                
                
                  try
                
                
                   {
            
                
                
                  //
                
                
                  默认队列和组不存 创建队列
                
                
            Collection<String> streamCollect=
                
                  new
                
                 ArrayList<>
                
                  ();
            streamCollect.add(streamKey);
            Long c_stream 
                
                =
                
                   stringRedisTemplate.countExistingKeys(streamCollect);
            
                
                
                  if
                
                (c_stream==0
                
                  )
            {
                isok
                
                = 
                
                  false
                
                ;
                
                  //
                
                
                  队列不存在
                
                
                  return
                
                
                   isok;
            }
            
                
                
                  //
                
                
                  队列存在 ,那么直接创建消费者组,
                
                
            stringRedisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"
                
                  ), groupKey);
            System.out.println(streamKey 
                
                + "队列" + streamKey + "存在,组" + groupKey + "创建成功。"
                
                  );
            isok
                
                =
                
                  true
                
                ;
                
                  //
                
                
                  组创建成功
                
                
                          }

        
                
                
                  catch
                
                
                   ( Exception ex)
        {
            String errx 
                
                =
                
                   ex.getMessage().toString();

            
                
                
                  if
                
                (errx.indexOf("BUSYGROUP Consumer Group name already exists")>0)
                
                  //
                
                
                  队列与组都已经存在
                
                
                              {
                isok
                
                =
                
                  true
                
                ;
                
                  //
                
                
                  组创建成功
                
                
                
                  //
                
                
                  System.out.println(streamKey + "队列" + streamKey + "和组" + groupKey + "都已经存在");
                
                
                              }
            
                
                
                  else
                
                
                  
            {
                isok
                
                =
                
                  false
                
                ;
                
                  //
                
                
                  未知异常
                
                
                                  log.debug(ex.getMessage());
            }
        }
                
                
                  finally
                
                
                   {

            
                
                
                  return
                
                
                    isok;
        }


    }
                
              

  。

订单创建入口函数(放在controller 或 seervice impl) 中 。

                
                  public
                
                
                   Result secKillVoucher(Long voucherId) {


        
                
                
                  long
                
                 userID =
                
                   UserHolder.getUser().getId();

        String vouchStockKey 
                
                = RedisConstants.SECKILL_VOUCHERSTOCK_CACHE_KEY +
                
                   voucherId;
        String userid_s 
                
                =
                
                   String.valueOf(userID);
        String vouchOrderKey 
                
                = RedisConstants.SECKILL_ORDER_CACHE_KEY +
                
                   voucherId;

        String streamKey 
                
                =
                
                   RedisConstants.REDIS_STREAM_QUEUE_ORDER ;
        String groupKey 
                
                =
                
                   RedisConstants.REDIS_STREAM_GROUP_ORDER;

        List
                
                <String> keys = 
                
                  new
                
                 ArrayList<>
                
                  ();
        keys.add(vouchStockKey);
                
                
                  //
                
                
                  库存key KEYS[1]
                
                
        keys.add(vouchOrderKey);
                
                  //
                
                
                  订单key KEYS[2]
                
                
        keys.add(streamKey);
                
                  //
                
                
                  队列key  KEYS[3]
                
                
        keys.add(groupKey);
                
                  //
                
                
                  消费组key  KEYS[4]
        
                
                
                  //
                
                
                  check sec_voucher_stock
        
                
                
                  //
                
                
                   1 查询优惠券
                
                
                  int
                
                 count =
                
                   seckillVoucherService.getRedisStock(voucherId);
        
                
                
                  if
                
                 (count <= 0
                
                  ) {
            
                
                
                  return
                
                 Result.fail("优惠券库存不存在 count:" +
                
                   count);

        }
        
                
                
                  long
                
                 orderid =
                
                   redisIdWorker.nextId(RedisConstants.ID_ORDER);
        CreateStreamAndGroup(streamKey,groupKey);

        
                
                
                  //
                
                
                  订单ID ARGV[3]
        
                
                
                  //
                
                
                  优惠券  id ARGV[2]
        
                
                
                  //
                
                
                  用户id ARGV[1]
        
                
                
                  //
                
                
                  执行lua 完成优惠券库存,一人一单,检查 并创建用户与优惠券ID之间的关联
        
                
                
                  //
                
                
                  脚本完成 创建消息队列 (执行此脚本之前确保队列 已经创建
                
                
        Long sekResult =
                
                   stringRedisTemplate.execute(SECKILL_SCRIPT,
                keys
                , userid_s,String.valueOf(voucherId),String.valueOf(orderid)
        );
        
                
                
                  int
                
                 ri =
                
                   sekResult.intValue();
        System.out.println(
                
                "lua 脚本执行返回值 :"+
                
                  ri);
        
                
                
                  if
                
                (ri!=0
                
                  )
        {
            
                
                
            String errtip="优惠券库存不足"
                
                  ;
           
                
                
                  switch
                
                
                   (ri)
           {
               
                
                
                  case
                
                 2
                
                  :
                   errtip
                
                ="订单重复一人一单"
                
                  ;
                       
                
                
                  break
                
                
                  ;
               
                
                
                  case
                
                 3
                
                  :
                   errtip
                
                ="库存数据不存在"
                
                  ;
                   
                
                
                  break
                
                
                  ;
               
                
                
                  case
                
                 4
                
                  :
                   errtip
                
                ="消息队列不存在"
                
                  ;
                   
                
                
                  break
                
                
                  ;
               
                
                
                  case
                
                 1
                
                  :
                   errtip
                
                ="优惠券库存不足"
                
                  ;
                   
                
                
                  break
                
                
                  ;
               
                
                
                  default
                
                
                  :
                   errtip
                
                ="未知错误"
                
                  ;

           }



          
                
                
                  return
                
                
                     Result.fail(errtip);
        }
    
                
                
                  /**
                
                
                  
        注意:下面的代理对象要有
     
                
                
                  */
                
                
                  
        proxy
                
                =
                
                  (IVoucherOrderService)AopContext.currentProxy();

        
                
                
                  return
                
                
                   Result.ok(orderid);

    }
                
              

  。

创建线程获取消息队列并处理 。

                
                  private
                
                
                  static
                
                
                  final
                
                 ExecutorService SECKILL_ORDER_EXECUTOR=
                
                  Executors.newSingleThreadExecutor();
    @PostConstruct
    
                
                
                  private
                
                
                  void
                
                
                   init(){
        
                
                
                  //
                
                
                  本地阻塞队列的方式
       
                
                
                  //
                
                
                   SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
        
                
                
                  //
                
                
                  读取redis消息队列并处理
                
                
        SECKILL_ORDER_EXECUTOR.submit(
                
                  new
                
                
                   VoucherOrderHandlerRedisQueue());
    }
   
                
                
                  //
                
                
                  从redis 消息队列中获取消息
                
                
                  private
                
                
                  class
                
                 VoucherOrderHandlerRedisQueue 
                
                  implements
                
                
                   Runnable{

        @Override
        
                
                
                  public
                
                
                  void
                
                
                   run() {
            
                
                
                  while
                
                (
                
                  true
                
                
                  ){
                
                
                
                  //
                
                
                  get orderinfo from blockingqueen
                
                
                  try
                
                
                   {
                    String streamKey 
                
                =
                
                   RedisConstants.REDIS_STREAM_QUEUE_ORDER ;
                    String groupKey 
                
                =
                
                   RedisConstants.REDIS_STREAM_GROUP_ORDER;
                    String cs
                
                =
                
                  RedisConstants.REDIS_STREAM_CONSUMER_ORDER;

                    
                
                
                  //
                
                
                  判断组是否存在
                
                
                  if
                
                 (!
                
                  ConsumerGroupExists(streamKey,groupKey)) {
                       
                
                
                  //
                
                
                   log.error("异步线程读取redis stream 失败:队列 未创建:"+streamKey);
                
                
                        Thread.sleep(3000
                
                  );
                        
                
                
                  continue
                
                
                  ;
                    }
                     
                
                
                  //
                
                
                  从消息队列中获取消息
                
                
                    List<MapRecord<String, Object, Object>> queueList =
                
                   stringRedisTemplate.opsForStream()
                            .read(Consumer.from( groupKey,cs),
                            StreamReadOptions.empty().count(
                
                1).block(Duration.ofSeconds(2
                
                  ))
                            , StreamOffset.create(streamKey,ReadOffset.lastConsumed())
                    );
                    
                
                
                  if
                
                (queueList==
                
                  null
                
                 ||
                
                   queueList.isEmpty() )
                    {
                        
                
                
                  //
                
                
                  未获取到消息
                
                
                  continue
                
                ;
                
                  //
                
                
                  continue do next
                
                
                                      }
                    
                
                
                  //
                
                
                  处理消息
                
                
                    MapRecord<String, Object, Object> record = queueList.get(0
                
                  );
                    Map
                
                <Object, Object> vq =
                
                   record.getValue();
                    VoucherOrder voucherOrder
                
                = BeanUtil.fillBeanWithMap(vq,
                
                  new
                
                 VoucherOrder(),
                
                  false
                
                
                  );

                    
                
                
                  //
                
                
                  成功获取 创建订单到数据库
                
                
                                     handlerVoucherOrder(voucherOrder);
                   
                
                
                  //
                
                
                  确认消息 在PEL中移除
                
                
                                      stringRedisTemplate.opsForStream().acknowledge(streamKey,groupKey,record.getId());

                } 
                
                
                  catch
                
                
                   (Exception e) {

                    HandlePendingList();
                    log.error(e.getMessage().toString());
                    
                
                
                  //
                
                
                  throw new RuntimeException(e);
                
                
                                  }


            }
        }
                
              

异常处理 在redis stream pending List 中获取 队列 。

                
                  private
                
                
                  void
                
                
                   HandlePendingList() {
            String streamKey 
                
                = RedisConstants.REDIS_STREAM_QUEUE_ORDER ;
                
                  //
                
                
                  stream
                
                
            String groupKey = RedisConstants.REDIS_STREAM_GROUP_ORDER;
                
                  //
                
                
                   consumer group
                
                
            String cs=RedisConstants.REDIS_STREAM_CONSUMER_ORDER;
                
                  //
                
                
                  consumer
                
                
                  while
                
                 (
                
                  true
                
                
                  )
                
                
                
                  try
                
                
                   {
                    {
                        
                
                
                  //
                
                
                  read from  pel
                
                
                    List<MapRecord<String, Object, Object>> queueList =
                
                   stringRedisTemplate.opsForStream()
                            .read(Consumer.from ( groupKey,cs),
                            StreamReadOptions.empty().count(
                
                1
                
                  )
                            , StreamOffset.create(streamKey, ReadOffset.from(
                
                
                   "0" 
                
                
                  ))
                    );
                    
                
                
                  if
                
                (queueList==
                
                  null
                
                 ||
                
                  queueList.isEmpty())
                    {
                        
                
                
                  //
                
                
                  未获取到消息
                
                
                  break
                
                ;
                
                  //
                
                
                  continue do next normal
                
                
                                      }
                    
                
                
                  //
                
                
                  处理消息
                
                
                    MapRecord<String, Object, Object> record = queueList.get(0
                
                  );
                    Map
                
                <Object, Object> vq =
                
                   record.getValue();
                    VoucherOrder voucherOrder
                
                = BeanUtil.fillBeanWithMap(vq,
                
                  new
                
                 VoucherOrder(),
                
                  false
                
                
                  );

                    
                
                
                  //
                
                
                  成功获取
                
                
                                      handlerVoucherOrder(voucherOrder);
                    
                
                
                  //
                
                
                  确认消息
                
                
                                      stringRedisTemplate.opsForStream().acknowledge(streamKey,groupKey,record.getId());

                    }
                } 
                
                
                  catch
                
                
                   (Exception e) {
                    log.debug(
                
                "消息队列--peding List 处理异常"
                
                  );
                    
                
                
                  try
                
                
                   {
                        Thread.sleep(
                
                50
                
                  );
                    } 
                
                
                  catch
                
                
                   (InterruptedException ex) {
                        
                
                
                  throw
                
                
                  new
                
                
                   RuntimeException(ex);
                    }

                }
        }
    }
                
              

  。

订单处理 。

                
                  private
                
                
                    IVoucherOrderService proxy;
    
                
                
                  private
                
                
                  void
                
                
                   handlerVoucherOrder(VoucherOrder voucherOrder) {

        
                
                
                  if
                
                (voucherOrder==
                
                  null
                
                 || voucherOrder.getUserId()==
                
                  null
                
                
                  )
        {
            log.debug(
                
                "对象为空 或 属性用户ID 为空。"
                
                  );
        }
        
                
                
                  //
                
                
                   user ID
                
                
                  long
                
                 userID=
                
                   voucherOrder.getUserId();

        RLock lock
                
                =redisson.getLock(RedisConstants.LOCK_VOUCHERORDER_KEY+
                
                   StrUtil.toString(userID));
        
                
                
                  boolean
                
                 islock = lock.tryLock();
                
                  //
                
                
                  active watch dog
                
                
                  if
                
                (!
                
                  islock)
        {
           log.error(
                
                "锁创建失败"
                
                  );
        }

        
                
                
                  try
                
                
                   {
            
                
                
                  //
                
                
                  IVoucherOrderService proxy=(IVoucherOrderService)AopContext.currentProxy();
                
                
                  //
                
                
                  获取spring 对当前对象的代理
                
                
                               proxy.createVoucherOrderByObj(voucherOrder);

        } 
                
                
                  catch
                
                
                   (IllegalStateException e) {
            
                
                
                  throw
                
                
                  new
                
                
                   RuntimeException(e);
        }
                
                
                  finally
                
                
                   {
            
                
                
                  //
                
                
                  lock.unLock(); 
                
                
                  //
                
                
                  my define simple redis lock
                
                
            lock.unlock();
                
                  //
                
                
                  redisson release lock
                
                
                          }

    }
                
              

实现类中的方法 。

  。

                 @Transactional
                
                  //
                
                
                  因为方法中 订单的操作 和 库存扣减,所以增加事务支持 为防止数据 不同步
                
                
                  public
                
                
                  void
                
                
                   createVoucherOrderByObj(VoucherOrder voucher) {
        
                
                
                  //
                
                
                   user ID
                
                
                  if
                
                (voucher==
                
                  null
                
                 || voucher.getUserId()==
                
                  null
                
                
                  )
        {
            log.debug(
                
                "对象为空 或 属性用户ID 为空。"
                
                  );
            
                
                
                  return
                
                
                  ;
        }
        
                
                
                  long
                
                 userID=
                
                  voucher.getUserId();

        
                
                
                  int
                
                 count=query().eq("user_id",userID).eq("voucher_id"
                
                  , voucher.getVoucherId()).count();
        
                
                
                  if
                
                (count>0
                
                  )
        {
           log.error(
                
                "优惠券仅限每人一个 count:"+
                
                  count);
           
                
                
                  return
                
                
                  ;
        }

        
                
                
                  //
                
                
                  5扣减库存
                
                
                  boolean
                
                 success=
                
                  seckillVoucherService.update()
                .setSql(
                
                "stock=stock-1"
                
                  )
                .eq(
                
                "voucher_id", voucher.getVoucherId()).gt("stock",0
                
                  )
                .update();
        
                
                
                  if
                
                (!
                
                  success)
        {
            log.error(
                
                "扣减券库存失败Obj: voucher.getVoucherId():"+
                
                   voucher.getVoucherId());
            
                
                
                  return
                
                
                  ;
        }
        System.out.println(
                
                "voucher saved"
                
                  );
        
                
                
                  boolean
                
                 ds =
                
                   save(voucher);


    }
                
              

  。

最后此篇关于Java+Redis通过Lua完成库存扣减,创建消息队列,异步处理消息--实战的文章就讲到这里了,如果你想了解更多关于Java+Redis通过Lua完成库存扣减,创建消息队列,异步处理消息--实战的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com