gpt4 book ai didi

高并发环境下生成序列编码重复问题分析

转载 作者:我是一只小鸟 更新时间:2023-03-14 06:31:15 25 4
gpt4 key购买 nike

1、背景 。

有个业务系统(订单系统),通过后台日志和监控观察,系统偶尔会出现重复唯一索引问题,例如:后台日志片段 。

  Duplicate entry 'service_no'  for key 'idx_service_no' .... 。

也就是说写入数据与数据库已有数据发生重复.

下面我们分析一下问题出现在哪里:

这个字段就是业务编码 :service_no 。

这个取号规则是 :要固定长度13位数,由首写大写字母 F+年月日(201201)+6 位有序数字 组成.

6位有序数字,是当天数据库增量已有数据最大编码序列依次累加.

首先说明一下: 每天6位数字(最高是999999)一天最多99万个序列号绝对够当前业务使用的,(实际上一天最多也就几万个单号),所以号量是满足业务需求的.

业务规则没有问题,那说明是程序代码逻辑有问题了,我们来看一下代码情况:

  。

2、分析代码逻辑结构 。

  。

1、首先取号 。

获取生成下一个序列号取号方法如下:

  。

              
                /**
	 * 创建编码内部自带加锁
	 * @param rule 编码规则参数
	 * @param params 参数数组
	 * @return 取号结果字符串
	 */
   @Transactional
	public String create(String rule, String... params) {
		String code = "";
		String lockName = rule;
		if (params.length > 0 && StringUtils.isNotEmpty(params[0])) {
			lockName = rule + ":" + params[0];
		}
		if (params.length > 1) {
			lockName = rule + ":" + params[0] + ":" + params[1];
		}

		try {
            //加jedis客户端工具分布式锁
			RedisLocker.lock(lockName, 10);
			code = getNext(rule, params);
		} finally {
            //释放锁
			RedisLocker.unlock(lockName);
		}
		return code;
	}

	/**
	 * 创建序列编码,无事务控制,需要依赖外层加redis锁!
	 * @param rule 生成规则
	 * @param params 参数列表
	 * @return 取号结果字符串
	 */
	private String getNext(String rule, String... params) {
		String[] rules = rule.split("-");
		CodeCondition condition = new CodeCondition();
		Code code = new Code();
		String prefix = "";
		String num = "";
		String digit = "";
		int i = 0;
		for (String str : rules) {
			if (str.equals("r")) {
				// 类型
				code.setType(params[i]);
				condition.setType(params[i]);
				prefix += params[i];
				i++;
			} else if (str.equals("c")) {
				// 商家编码
				code.setShopCode(params[i]);
				condition.setShopCode(params[i]);
				prefix += params[i];
				i++;
			} else if (str.contains("yy") || str.contains("MM")) {
				// 日期
				//SimpleDateFormat formatter = new SimpleDateFormat(str);
				String t = DateUtils.getCurrentDate(str);
				code.setTime(t);
				prefix += t;
			} else if (str.contains("N")) {
				// 数字
				digit = str.substring(1);
			} else {
				// 其它
				prefix += str;
			}
		}
		code.setPrefix(prefix);
		condition.setPrefix(prefix);
		if (digit.length() > 0) {
			int n = 1;
			Integer serialNumber = codeDao.findNewOneByManual(condition);
			if (serialNumber != null) {
				n = serialNumber + 1;
			}
			code.setSerialNumber(n);
			num = String.format("%0" + digit + "d", n);
		}
		code.setCode(prefix + num);
		// 添加3次重试机制

		boolean success = codeDao.getNextCode(code);
		if (success) {
			return prefix + num;
		}
		return null;
	}
              
            

  。

2、数据操作层事务 。

提交到DAO层准备交给执行JDBC去执行提交到数据库,主要使用了 手动控制事务 提交代码如下:

  。

              
                /**
	 * 手动控制事务
	 * @param param 提交新的对象值更新
	 * @return
	 */
	public boolean getNextCode(final Code param){
        //手动控制事务
		transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
		transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
		return transactionTemplate.execute(new TransactionCallback<Boolean>() {
			public Boolean doInTransaction(final TransactionStatus status) {
				try {
					int i = 0;
					CodeCondition con = new CodeCondition();
					con.setType(param.getType());
					con.setTime(param.getTime());
					con.setShopCode(param.getShopCode());
                    //根据入参条件查询是否存在编码对象数据
					Code c = findNewOne(con);
					if(c == null){
						i = jdbcTemplate.update("INSERT INTO xx_code(id,code,type,time,shop_code,serial_number,prefix,remarks) VALUES(?,?,?,?,?,?,?,?)",
								UUID.randomUUID().toString().replaceAll("-", ""),
								param.getCode(),
								param.getType(),
								param.getTime(),
								param.getShopCode(),
								param.getSerialNumber(),
								param.getPrefix(),
								param.getRemarks());
					} else {
						List<Object> params = new ArrayList<Object>();
						params.add(param.getCode());
						params.add(param.getPrefix());
						params.add(param.getSerialNumber());
						params.add(c.getTime());
						params.add(c.getId());
						//使用旧值做匹配条件
						String sql = "UPDATE xx_code SET code=?, prefix=?,serial_number=?,time=?  WHERE id=? ";
						i = jdbcTemplate.update(sql, params.toArray());
					}
					return i > 0;
				} catch (Exception ex) {
					 status.setRollbackOnly();
					logger.error(ex.getMessage(),ex);
				}
				return false;
			}
		});
	}
              
            

  。

这里备注说明一下几个事务属于参数:

PROPAGATION_REQUIRED-- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。  。

假如当前正要执行的事务不在另外一个事务里,那么就起一个新的事务 .

ServiceA {                 void methodA() {            ServiceB.methodB();        }   }       ServiceB {                 void methodB() {        }            }    比如说,ServiceB.methodB的事务级别定义为PROPAGATION_REQUIRED, 那么由于执行ServiceA.methodA的时候   1、如果ServiceA.methodA已经起了事务,这时调用ServiceB.methodB,ServiceB.methodB看到自己已经运行在ServiceA.methodA的事务内部,就不再起新的事务。这时只有外部事务并且他们是共用的,所以这时ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚。   2、如果ServiceA.methodA没有事务,ServiceB.methodB就会为自己分配一个事务。这样,在ServiceA.methodA中是没有事务控制的。只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚.

 在 spring的 TransactionDefinition接口中一共定义了六种事务传播属性:  PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。  PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务,就以非事务方式执行。  PROPAGATION_MANDATORY -- 支持当前事务,如果当前没有事务,就抛出异常。  PROPAGATION_REQUIRES_NEW -- 新建事务,如果当前存在事务,把当前事务挂起。  PROPAGATION_NOT_SUPPORTED -- 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。  PROPAGATION_NEVER -- 以非事务方式执行,如果当前存在事务,则抛出异常。  PROPAGATION_NESTED -- 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。  前六个策略类似于EJB CMT,第七个(PROPAGATION_NESTED)是Spring所提供的一个特殊变量。  它要求事务管理器或者使用JDBC 3.0 Savepoint API提供嵌套事务行为(如Spring的DataSourceTransactionManager).

  。

3、分布式锁工具 。

使用jedis工具包作为分布式锁代码如下:

  。

              
                /**
	 * 在指定时间内等待获取锁
	 * @param waitTime 等待锁的时间
	 * 
	 */
	public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
		// 系统当前时间,以毫微秒为单位。
		long nano = System.nanoTime(); 
		do {
			if(tryLock()){
				logger.debug(this.lockValue + "获取锁");
				return Boolean.TRUE;
			}
			Thread.sleep(new Random().nextInt(100) + 1);
		} while ((System.nanoTime() - nano) < unit.toNanos(waitTime));

		return Boolean.FALSE;
	}
	
	/**
	 * 阻塞式加锁
	 */
	public void lock() {
		while(!tryLock()){
			try {
				// 睡眠,降低抢锁频率,缓解redis压力
				Thread.sleep(new Random().nextInt(100) + 1); 
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 获取锁的线程解锁
	 * 不足:当时阿里云redis集群版本暂不支持执行lua脚本eval函数。
	 * 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN
	 * 
	 */
	public void unlock() {
		// 检查当前线程是否持有锁
		if (this.lockValue.equals(jedis.get(this.lockName))) {
			logger.debug(this.lockValue + "释放锁");
			try {
				jedis.del(this.lockName);
			} finally {
				// Jedis 客户端版本是使用 Jedis-2.7.2版本;如果是2.9以上本的版注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
				if (jedis != null) {
					jedis.close();
				}
			}
		} else {
			logger.debug(Thread.currentThread().getName() + "并非持有锁的线程,未能解锁");
		}
	}
              
            

  。

上面展示代码展示分布式销和事务一些使用, 但请注意这里有坑!!! 。

  。

4、存在哪些坑?

坑1 : Jedis客户端版本要注意一下,如果是3.0.1及以上的话 jedis.close();已经被重写,请看客方源代码.

2.9 版本以前连接池使用有returnResource接口方法,3.0.1之后版本被去掉了.

官方重写了close方法,jedis.close不能直接调用.

try {     jedis = pool.getResource(); } finally { if (jedis != null) {     jedis.close();     } } 。

某一次升级了jedis client版本还导致生产环境redis服务被打暴,引发重大事故,所以如果使用jedis client建议使用2.9以下版本还靠谱一些.

其实这种写法,还有一些问题的,需要进一步改进。如何改进?我们往下一步分析.

改进版本1(使用lua脚本替代):

              
                 /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryLock(Jedis jedis, String lockKey, String requestId, long expireTime) {

        //jedis 3.0之后写法
        /*   SetParams params = new SetParams();
        params.px(expireTime);
        params.nx();*/
        String result = jedis.set(lockKey,  requestId,"NX", "PX", expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

    /**
     * 释放分布式锁(LUA脚本实现)
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return
     */
    public static boolean unLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return " +
                "0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        if (jedis != null) {
            jedis.close();
        }
        return false;
    }
              
            

  。

 当时阿里云redis集群版本暂不支持执行lua脚本eval函数。所以当时lua脚本方式的无发挥之地!  参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN 。

  。

改进版本2(使用redisson替代):

              
                /**
 * 利用redisson client 实现分布式锁
 * @author cgli
 */
public final class RedisLocker {

	private final static String LOCKER_PREFIX = "lock:";

	private static final Logger logger = LoggerFactory.getLogger(RedisLocker.class);

	private RedisLocker() {

    }

	/**
	 * 获取连接配置实例
	 * @return
	 */
	private static RedissonClient getClient() {
		return SingletonHolder.client;
	}

	/**
     * 根据name对进行上锁操作,redissonLock 阻塞式的,采用的机制发布/订阅
	 * timeout结束强制解锁,防止死锁 :1分钟
     * @param lockName 锁名称
     */
    public static void lock(String lockName){
       lock(lockName,60);
    }

    /**
	 * 根据name对进行上锁操作采用redisson RLOCK加锁方式
	 * @param lockName 锁名称
	 * @param leaseTime 结束强制解锁,防止死锁 :单位秒
	 */
	public static void lock(String lockName,long leaseTime){
		String key = LOCKER_PREFIX + lockName;
		RLock lock = getClient().getLock(key);
		//lock提供带timeout参数,timeout结束强制解锁,防止死锁 :1分钟
		lock.lock(leaseTime, TimeUnit.SECONDS);
	}

    /**
     * 根据name对进行解锁操作
     * @param lockName
     */
    public static void unlock(String lockName){
		String key = LOCKER_PREFIX + lockName;
		RLock lock = getClient().getLock(key);
		if(lock.isLocked()){
			if(lock.isHeldByCurrentThread()){
				lock.unlock();
			}
		}
    }

	/**
	 *
	 * @param resourceName 锁的KEY名称
	 * @param worker 回调外部工作任务
	 * @param lockTime 锁定超时时间,默认acquireTimeout=100second 获取锁的超时时间
	 * @param <T>
	 * @return
	 * @throws Exception
	 */
	public static <T> T lock(String resourceName, AquiredLockWorker<T> worker,
			long lockTime) throws Exception {
		return lock(resourceName, worker, 100, lockTime);
	}

	/**
	 *
	 * @param resourceName 锁的KEY名称
	 * @param worker 回调外部工作任务
	 * @param acquireTimeout 获取锁的超时时间
	 * @param lockTime 锁定超时时间
	 * @param <T>
	 * @return
	 * @throws Exception
	 */
	public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long acquireTimeout,
							 long lockTime) throws Exception {
		RLock lock = getClient().getLock(LOCKER_PREFIX + resourceName);

		// Acquire lock and release it automatically after 10 seconds
		// if unlock method hasn't been invoked
		//lock.lock(10, TimeUnit.SECONDS);

		try {
			// Wait for acquireTimeout seconds and automatically unlock it after lockTime seconds
			boolean res = lock.tryLock(acquireTimeout, lockTime, TimeUnit.SECONDS);
			if (res) {
				return worker.execute();
			}
		} finally {
			if (lock != null) {
				lock.unlock();
			}
		}
		return null;
	}

	/**
	 * 内部类实现单例模式
	 */
	static class SingletonHolder {
		private static RedissonClient client = init();

		private static RedissonClient init() {
			RedisProperties properties = ApplicationContextHolder.getApplicationContext().getBean(RedisProperties.class);
			String host = properties.getRedisHost();
			int port = Integer.parseInt(properties.getRedisPort());
			String password = properties.getRedisPassword();
			if (StringUtils.isEmpty(password)) {
				password = null;
			}
			int database = Integer.parseInt(properties.getRedisDataBase());
			try {
				Config config = new Config();
				config.useSingleServer()
						.setAddress("redis://" + host + ":" + port)
						.setPassword(password)
						.setDatabase(database)
						//同任何节点建立连接时的等待超时。时间单位是毫秒。默认:10000
						.setConnectTimeout(30000)
						//当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。默认:3000
						//等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认:3000
						.setTimeout(10000)
						//如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时)
						// 计时。默认值:3
						.setRetryAttempts(5)
						//在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。     默认值:1500
						.setRetryInterval(3000);
				return Redisson.create(config);
			} catch (Exception e) {
				logger.error(e.getMessage(), e);
			}
			return null;
		}
	}

}
              
            

  。

  坑2 :使用@Transactional 注解 嵌套事务的问题,会导致一些事务混乱,达不到最终的数据效果.

 回去最开始代码处 。

    @Transactional  //这里加了事务注解     public String create(String rule, String... params) { 。

     try {             //加jedis客户端工具分布式锁             RedisLocker.lock(lockName, 10);             code = getNext(rule, params);         } finally {             //释放锁             RedisLocker.unlock(lockName);         } 。

  } 。

 这种结果达不到预期的结果,就是会出现 有时会产生重复的编码 ,如本文提出的问题。这是为什么呢?

由于spring的AOP机制,会在 update/save方法之前开启事务 ,在这之后再加锁,当锁住的代码执行完成后, 再提交事务 ,因此锁代码块执行是在事务之内执行的.

可以推断在代码块执行完时, 事务还未提交 ,这时如果其他线程进入锁代码块后,读取的库存数据就不是最新的,就可能产生了不是你想要的结果数据.

  。

这个问题我们验证测试一下,写一个测试用例 。

用jmeter压力测试跑一下,取序列号的结果 。

  。

  。

在日志分析中,经常是有两个线程查到上一个相同的序列号,拿到的不是更新之后的数据结果,如下图

  。

产生重复编码,说明jedis分布式锁并没有真实锁住。问题就是出现在最外层的 @ Transactional注解上,最外层调用代码完全没有必要加了@Transactional.

因为最内层已经加了手动控制事务的控制。拿掉外层的@Transactional再跑压力测试一切正常.

  。

三 、分析代码逻辑结构 。

  。

1、 Redis java客户端不推荐使用jedis,特别是2.9以上的版本,代码没有处理好很容易搞把redis服务搞死(连接数打满),推荐使用redisson代替,性能高且内置实现连接池.

如果真的一定要用jedis使用2.9以下版本并使用 lua脚本来控制,才能实现真正原子性操作.

2、事务注解@Transactional事务控制,嵌套使用时要注意,尽量控制在最小单元的最内层使用,在最外层(大方法)使用有风险,特别是跟锁一起使用时要注意控制两者顺序.

另外@Transactional在分布式环境下,远程调用无效的,并不能当作分布式事务来对待,这个业内有成熟其他方案替代.

3、其实取号生成序列号服务,没有必要使用数据库当作序列计数,完全可以使用redis计数器做实现(内部版本2年前就用redis版本来取号,连续运行两2年多没有发现有啥问题).

示例代码如下:

              
                 private String generateByRedis(BusinessCodeCondition condition) {
        String time = "";
        String prefix = "";
        String type = condition.getType();
        StringBuilder sb = new StringBuilder();
        sb.append(type);
        if (StringUtils.isNotEmpty(condition.getShopCode())) {
            sb.append(condition.getShopCode());
        }
        if (StringUtils.isNotEmpty(condition.getDateFormat())) {
            time = DateFormatUtil.formatDate(condition.getDateFormat(), new Date());
            sb.append(time);
        }
        prefix = sb.toString();
        sb.setLength(0);
        String key = KEY_PREFIX + prefix;
        //long n = redisTemplate.opsForValue().increment(key, 1L);
        //redisTemplate.expire(key, EXPIRE_TIME, TimeUnit.DAYS);
        long n = getIncrementNum(key, condition.getDateFormat());
        return String.format("%s%0" + condition.getDigitCount() + "d", prefix, n);
    }

    //使用redis计数器取序列,每天过期前一天的key值回收。
    private Long getIncrementNum(String key, String dateFormat) {
        RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        Long counter = entityIdCounter.incrementAndGet();
        if ((null == counter || counter.longValue() == 1)) {
            if (StringUtils.isNotEmpty(dateFormat) && dateFormat.indexOf("yyMMdd") > -1) {
                entityIdCounter.expire(EXPIRE_TIME, TimeUnit.DAYS);
            }
        }
        return counter;
    }
              
            

  。

最后此篇关于高并发环境下生成序列编码重复问题分析的文章就讲到这里了,如果你想了解更多关于高并发环境下生成序列编码重复问题分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com