- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
1、背景 。
有个业务系统(订单系统),通过后台日志和监控观察,系统偶尔会出现重复唯一索引问题,例如:后台日志片段 。
Duplicate entry 'service_no' for key 'idx_service_no' .... 。
也就是说写入数据与数据库已有数据发生重复.
下面我们分析一下问题出现在哪里:
这个字段就是业务编码 :service_no 。
这个取号规则是 :要固定长度13位数,由首写大写字母 F+年月日(201201)+6 位有序数字 组成.
6位有序数字,是当天数据库增量已有数据最大编码序列依次累加.
首先说明一下: 每天6位数字(最高是999999)一天最多99万个序列号绝对够当前业务使用的,(实际上一天最多也就几万个单号),所以号量是满足业务需求的.
业务规则没有问题,那说明是程序代码逻辑有问题了,我们来看一下代码情况:
。
2、分析代码逻辑结构 。
。
1、首先取号 。
获取生成下一个序列号取号方法如下:
。
/**
* 创建编码内部自带加锁
* @param rule 编码规则参数
* @param params 参数数组
* @return 取号结果字符串
*/
@Transactional
public String create(String rule, String... params) {
String code = "";
String lockName = rule;
if (params.length > 0 && StringUtils.isNotEmpty(params[0])) {
lockName = rule + ":" + params[0];
}
if (params.length > 1) {
lockName = rule + ":" + params[0] + ":" + params[1];
}
try {
//加jedis客户端工具分布式锁
RedisLocker.lock(lockName, 10);
code = getNext(rule, params);
} finally {
//释放锁
RedisLocker.unlock(lockName);
}
return code;
}
/**
* 创建序列编码,无事务控制,需要依赖外层加redis锁!
* @param rule 生成规则
* @param params 参数列表
* @return 取号结果字符串
*/
private String getNext(String rule, String... params) {
String[] rules = rule.split("-");
CodeCondition condition = new CodeCondition();
Code code = new Code();
String prefix = "";
String num = "";
String digit = "";
int i = 0;
for (String str : rules) {
if (str.equals("r")) {
// 类型
code.setType(params[i]);
condition.setType(params[i]);
prefix += params[i];
i++;
} else if (str.equals("c")) {
// 商家编码
code.setShopCode(params[i]);
condition.setShopCode(params[i]);
prefix += params[i];
i++;
} else if (str.contains("yy") || str.contains("MM")) {
// 日期
//SimpleDateFormat formatter = new SimpleDateFormat(str);
String t = DateUtils.getCurrentDate(str);
code.setTime(t);
prefix += t;
} else if (str.contains("N")) {
// 数字
digit = str.substring(1);
} else {
// 其它
prefix += str;
}
}
code.setPrefix(prefix);
condition.setPrefix(prefix);
if (digit.length() > 0) {
int n = 1;
Integer serialNumber = codeDao.findNewOneByManual(condition);
if (serialNumber != null) {
n = serialNumber + 1;
}
code.setSerialNumber(n);
num = String.format("%0" + digit + "d", n);
}
code.setCode(prefix + num);
// 添加3次重试机制
boolean success = codeDao.getNextCode(code);
if (success) {
return prefix + num;
}
return null;
}
。
2、数据操作层事务 。
提交到DAO层准备交给执行JDBC去执行提交到数据库,主要使用了 手动控制事务 提交代码如下:
。
/**
* 手动控制事务
* @param param 提交新的对象值更新
* @return
*/
public boolean getNextCode(final Code param){
//手动控制事务
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
return transactionTemplate.execute(new TransactionCallback<Boolean>() {
public Boolean doInTransaction(final TransactionStatus status) {
try {
int i = 0;
CodeCondition con = new CodeCondition();
con.setType(param.getType());
con.setTime(param.getTime());
con.setShopCode(param.getShopCode());
//根据入参条件查询是否存在编码对象数据
Code c = findNewOne(con);
if(c == null){
i = jdbcTemplate.update("INSERT INTO xx_code(id,code,type,time,shop_code,serial_number,prefix,remarks) VALUES(?,?,?,?,?,?,?,?)",
UUID.randomUUID().toString().replaceAll("-", ""),
param.getCode(),
param.getType(),
param.getTime(),
param.getShopCode(),
param.getSerialNumber(),
param.getPrefix(),
param.getRemarks());
} else {
List<Object> params = new ArrayList<Object>();
params.add(param.getCode());
params.add(param.getPrefix());
params.add(param.getSerialNumber());
params.add(c.getTime());
params.add(c.getId());
//使用旧值做匹配条件
String sql = "UPDATE xx_code SET code=?, prefix=?,serial_number=?,time=? WHERE id=? ";
i = jdbcTemplate.update(sql, params.toArray());
}
return i > 0;
} catch (Exception ex) {
status.setRollbackOnly();
logger.error(ex.getMessage(),ex);
}
return false;
}
});
}
。
这里备注说明一下几个事务属于参数:
PROPAGATION_REQUIRED-- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 。
假如当前正要执行的事务不在另外一个事务里,那么就起一个新的事务 .
ServiceA { void methodA() { ServiceB.methodB(); } } ServiceB { void methodB() { } } 比如说,ServiceB.methodB的事务级别定义为PROPAGATION_REQUIRED, 那么由于执行ServiceA.methodA的时候 1、如果ServiceA.methodA已经起了事务,这时调用ServiceB.methodB,ServiceB.methodB看到自己已经运行在ServiceA.methodA的事务内部,就不再起新的事务。这时只有外部事务并且他们是共用的,所以这时ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚。 2、如果ServiceA.methodA没有事务,ServiceB.methodB就会为自己分配一个事务。这样,在ServiceA.methodA中是没有事务控制的。只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚.
在 spring的 TransactionDefinition接口中一共定义了六种事务传播属性: PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务,就以非事务方式执行。 PROPAGATION_MANDATORY -- 支持当前事务,如果当前没有事务,就抛出异常。 PROPAGATION_REQUIRES_NEW -- 新建事务,如果当前存在事务,把当前事务挂起。 PROPAGATION_NOT_SUPPORTED -- 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。 PROPAGATION_NEVER -- 以非事务方式执行,如果当前存在事务,则抛出异常。 PROPAGATION_NESTED -- 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。 前六个策略类似于EJB CMT,第七个(PROPAGATION_NESTED)是Spring所提供的一个特殊变量。 它要求事务管理器或者使用JDBC 3.0 Savepoint API提供嵌套事务行为(如Spring的DataSourceTransactionManager).
。
3、分布式锁工具 。
使用jedis工具包作为分布式锁代码如下:
。
/**
* 在指定时间内等待获取锁
* @param waitTime 等待锁的时间
*
*/
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
// 系统当前时间,以毫微秒为单位。
long nano = System.nanoTime();
do {
if(tryLock()){
logger.debug(this.lockValue + "获取锁");
return Boolean.TRUE;
}
Thread.sleep(new Random().nextInt(100) + 1);
} while ((System.nanoTime() - nano) < unit.toNanos(waitTime));
return Boolean.FALSE;
}
/**
* 阻塞式加锁
*/
public void lock() {
while(!tryLock()){
try {
// 睡眠,降低抢锁频率,缓解redis压力
Thread.sleep(new Random().nextInt(100) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取锁的线程解锁
* 不足:当时阿里云redis集群版本暂不支持执行lua脚本eval函数。
* 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN
*
*/
public void unlock() {
// 检查当前线程是否持有锁
if (this.lockValue.equals(jedis.get(this.lockName))) {
logger.debug(this.lockValue + "释放锁");
try {
jedis.del(this.lockName);
} finally {
// Jedis 客户端版本是使用 Jedis-2.7.2版本;如果是2.9以上本的版注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
if (jedis != null) {
jedis.close();
}
}
} else {
logger.debug(Thread.currentThread().getName() + "并非持有锁的线程,未能解锁");
}
}
。
上面展示代码展示分布式销和事务一些使用, 但请注意这里有坑!!! 。
。
4、存在哪些坑?
坑1 : Jedis客户端版本要注意一下,如果是3.0.1及以上的话 jedis.close();已经被重写,请看客方源代码.
2.9 版本以前连接池使用有returnResource接口方法,3.0.1之后版本被去掉了.
官方重写了close方法,jedis.close不能直接调用.
try { jedis = pool.getResource(); } finally { if (jedis != null) { jedis.close(); } } 。
某一次升级了jedis client版本还导致生产环境redis服务被打暴,引发重大事故,所以如果使用jedis client建议使用2.9以下版本还靠谱一些.
其实这种写法,还有一些问题的,需要进一步改进。如何改进?我们往下一步分析.
改进版本1(使用lua脚本替代):
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryLock(Jedis jedis, String lockKey, String requestId, long expireTime) {
//jedis 3.0之后写法
/* SetParams params = new SetParams();
params.px(expireTime);
params.nx();*/
String result = jedis.set(lockKey, requestId,"NX", "PX", expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁(LUA脚本实现)
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return
*/
public static boolean unLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return " +
"0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
if (jedis != null) {
jedis.close();
}
return false;
}
。
当时阿里云redis集群版本暂不支持执行lua脚本eval函数。所以当时lua脚本方式的无发挥之地! 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN 。
。
改进版本2(使用redisson替代):
/**
* 利用redisson client 实现分布式锁
* @author cgli
*/
public final class RedisLocker {
private final static String LOCKER_PREFIX = "lock:";
private static final Logger logger = LoggerFactory.getLogger(RedisLocker.class);
private RedisLocker() {
}
/**
* 获取连接配置实例
* @return
*/
private static RedissonClient getClient() {
return SingletonHolder.client;
}
/**
* 根据name对进行上锁操作,redissonLock 阻塞式的,采用的机制发布/订阅
* timeout结束强制解锁,防止死锁 :1分钟
* @param lockName 锁名称
*/
public static void lock(String lockName){
lock(lockName,60);
}
/**
* 根据name对进行上锁操作采用redisson RLOCK加锁方式
* @param lockName 锁名称
* @param leaseTime 结束强制解锁,防止死锁 :单位秒
*/
public static void lock(String lockName,long leaseTime){
String key = LOCKER_PREFIX + lockName;
RLock lock = getClient().getLock(key);
//lock提供带timeout参数,timeout结束强制解锁,防止死锁 :1分钟
lock.lock(leaseTime, TimeUnit.SECONDS);
}
/**
* 根据name对进行解锁操作
* @param lockName
*/
public static void unlock(String lockName){
String key = LOCKER_PREFIX + lockName;
RLock lock = getClient().getLock(key);
if(lock.isLocked()){
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
/**
*
* @param resourceName 锁的KEY名称
* @param worker 回调外部工作任务
* @param lockTime 锁定超时时间,默认acquireTimeout=100second 获取锁的超时时间
* @param <T>
* @return
* @throws Exception
*/
public static <T> T lock(String resourceName, AquiredLockWorker<T> worker,
long lockTime) throws Exception {
return lock(resourceName, worker, 100, lockTime);
}
/**
*
* @param resourceName 锁的KEY名称
* @param worker 回调外部工作任务
* @param acquireTimeout 获取锁的超时时间
* @param lockTime 锁定超时时间
* @param <T>
* @return
* @throws Exception
*/
public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long acquireTimeout,
long lockTime) throws Exception {
RLock lock = getClient().getLock(LOCKER_PREFIX + resourceName);
// Acquire lock and release it automatically after 10 seconds
// if unlock method hasn't been invoked
//lock.lock(10, TimeUnit.SECONDS);
try {
// Wait for acquireTimeout seconds and automatically unlock it after lockTime seconds
boolean res = lock.tryLock(acquireTimeout, lockTime, TimeUnit.SECONDS);
if (res) {
return worker.execute();
}
} finally {
if (lock != null) {
lock.unlock();
}
}
return null;
}
/**
* 内部类实现单例模式
*/
static class SingletonHolder {
private static RedissonClient client = init();
private static RedissonClient init() {
RedisProperties properties = ApplicationContextHolder.getApplicationContext().getBean(RedisProperties.class);
String host = properties.getRedisHost();
int port = Integer.parseInt(properties.getRedisPort());
String password = properties.getRedisPassword();
if (StringUtils.isEmpty(password)) {
password = null;
}
int database = Integer.parseInt(properties.getRedisDataBase());
try {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setDatabase(database)
//同任何节点建立连接时的等待超时。时间单位是毫秒。默认:10000
.setConnectTimeout(30000)
//当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。默认:3000
//等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认:3000
.setTimeout(10000)
//如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时)
// 计时。默认值:3
.setRetryAttempts(5)
//在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 默认值:1500
.setRetryInterval(3000);
return Redisson.create(config);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
}
}
。
坑2 :使用@Transactional 注解 嵌套事务的问题,会导致一些事务混乱,达不到最终的数据效果.
回去最开始代码处 。
@Transactional //这里加了事务注解 public String create(String rule, String... params) { 。
try { //加jedis客户端工具分布式锁 RedisLocker.lock(lockName, 10); code = getNext(rule, params); } finally { //释放锁 RedisLocker.unlock(lockName); } 。
} 。
这种结果达不到预期的结果,就是会出现 有时会产生重复的编码 ,如本文提出的问题。这是为什么呢?
由于spring的AOP机制,会在 update/save方法之前开启事务 ,在这之后再加锁,当锁住的代码执行完成后, 再提交事务 ,因此锁代码块执行是在事务之内执行的.
可以推断在代码块执行完时, 事务还未提交 ,这时如果其他线程进入锁代码块后,读取的库存数据就不是最新的,就可能产生了不是你想要的结果数据.
。
这个问题我们验证测试一下,写一个测试用例 。
用jmeter压力测试跑一下,取序列号的结果 。
。
。
在日志分析中,经常是有两个线程查到上一个相同的序列号,拿到的不是更新之后的数据结果,如下图
。
产生重复编码,说明jedis分布式锁并没有真实锁住。问题就是出现在最外层的 @ Transactional注解上,最外层调用代码完全没有必要加了@Transactional.
因为最内层已经加了手动控制事务的控制。拿掉外层的@Transactional再跑压力测试一切正常.
。
三 、分析代码逻辑结构 。
。
1、 Redis java客户端不推荐使用jedis,特别是2.9以上的版本,代码没有处理好很容易搞把redis服务搞死(连接数打满),推荐使用redisson代替,性能高且内置实现连接池.
如果真的一定要用jedis使用2.9以下版本并使用 lua脚本来控制,才能实现真正原子性操作.
2、事务注解@Transactional事务控制,嵌套使用时要注意,尽量控制在最小单元的最内层使用,在最外层(大方法)使用有风险,特别是跟锁一起使用时要注意控制两者顺序.
另外@Transactional在分布式环境下,远程调用无效的,并不能当作分布式事务来对待,这个业内有成熟其他方案替代.
3、其实取号生成序列号服务,没有必要使用数据库当作序列计数,完全可以使用redis计数器做实现(内部版本2年前就用redis版本来取号,连续运行两2年多没有发现有啥问题).
示例代码如下:
private String generateByRedis(BusinessCodeCondition condition) {
String time = "";
String prefix = "";
String type = condition.getType();
StringBuilder sb = new StringBuilder();
sb.append(type);
if (StringUtils.isNotEmpty(condition.getShopCode())) {
sb.append(condition.getShopCode());
}
if (StringUtils.isNotEmpty(condition.getDateFormat())) {
time = DateFormatUtil.formatDate(condition.getDateFormat(), new Date());
sb.append(time);
}
prefix = sb.toString();
sb.setLength(0);
String key = KEY_PREFIX + prefix;
//long n = redisTemplate.opsForValue().increment(key, 1L);
//redisTemplate.expire(key, EXPIRE_TIME, TimeUnit.DAYS);
long n = getIncrementNum(key, condition.getDateFormat());
return String.format("%s%0" + condition.getDigitCount() + "d", prefix, n);
}
//使用redis计数器取序列,每天过期前一天的key值回收。
private Long getIncrementNum(String key, String dateFormat) {
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
Long counter = entityIdCounter.incrementAndGet();
if ((null == counter || counter.longValue() == 1)) {
if (StringUtils.isNotEmpty(dateFormat) && dateFormat.indexOf("yyMMdd") > -1) {
entityIdCounter.expire(EXPIRE_TIME, TimeUnit.DAYS);
}
}
return counter;
}
。
最后此篇关于高并发环境下生成序列编码重复问题分析的文章就讲到这里了,如果你想了解更多关于高并发环境下生成序列编码重复问题分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
关闭。这个问题是off-topic .它目前不接受答案。 想要改进这个问题? Update the question所以它是on-topic用于堆栈溢出。 关闭 12 年前。 Improve thi
我有一个动态网格,其中的数据功能需要正常工作,这样我才能逐步复制网格中的数据。假设在第 5 行中,我输入 10,则从第 6 行开始的后续行应从 11 开始读取,依此类推。 如果我转到空白的第一行并输入
我有一个关于我的按钮消失的问题 我已经把一个图像作为我的按钮 用这个函数动画 function example_animate(px) { $('#cont
我有一个具有 Facebook 连接和经典用户名/密码登录的网站。目前,如果用户单击 facebook_connect 按钮,系统即可运行。但是,我想将现有帐户链接到 facebook,因为用户可以选
我有一个正在为 iOS 开发的应用程序,该应用程序执行以下操作 加载和设置注释并启动核心定位和缩放到位置。 map 上有很多注释,从数据加载不会花很长时间,但将它们实际渲染到 map 上需要一段时间。
我被推荐使用 Heroku for Ruby on Rails 托管,到目前为止,我认为我真的会喜欢它。只是想知道是否有人可以帮助我找出问题所在。 我按照那里的说明在该网站上创建应用程序,创建并提交
我看过很多关于 SSL 错误的帖子和信息,我自己也偶然发现了一个。 我正在尝试使用 GlobalSign CA BE 证书通过 Android WebView 访问网页,但出现了不可信错误。 对于大多
我想开始使用 OpenGL 3+ 和 4,但我在使用 Glew 时遇到了问题。我试图将 glew32.lib 包含在附加依赖项中,并且我已将库和 .dll 移动到主文件夹中,因此不应该有任何路径问题。
我已经盯着这两个下载页面的源代码看了一段时间,但我似乎找不到问题。 我有两个下载页面,一个 javascript 可以工作,一个没有。 工作:http://justupload.it/v/lfd7不是
我一直在使用 jQuery,只是尝试在单击链接时替换文本字段以及隐藏/显示内容项。它似乎在 IE 中工作得很好,但我似乎无法让它在 FF 中工作。 我的 jQuery: $(function() {
我正在尝试为 NDK 编译套接字库,但出现以下两个错误: error: 'close' was not declared in this scope 和 error: 'min' is not a m
我正在使用 Selenium 浏览器自动化框架测试网站。在测试过程中,我切换到特定的框架,我们将其称为“frame_1”。后来,我在 Select 类中使用了 deselectAll() 方法。不久之
我正在尝试通过 Python 创建到 Heroku PostgreSQL 数据库的连接。我将 Windows10 与 Python 3.6.8 和 PostgreSQL 9.6 一起使用。 我从“ht
我有一个包含 2 列的数据框,我想根据两列之间的比较创建第三列。 所以逻辑是:第 1 列 val = 3,第 2 列 val = 4,因此新列值什么都没有 第 1 列 val = 3,第 2 列 va
我想知道如何调试 iphone 5 中的 css 问题。 我尝试使用 firelite 插件。但是从纵向旋转到横向时,火石占据了整个屏幕。 有没有其他方法可以调试 iphone 5 中的 css 问题
所以我有点难以理解为什么这不起作用。我正在尝试替换我正在处理的示例站点上的类别复选框。我试图让它做以下事情:未选中时以一种方式出现,悬停时以另一种方式出现(选中或未选中)选中时以第三种方式出现(而不是
Javascript CSS 问题: 我正在使用一个文本框来写入一个 div。我使用以下 javascript 获取文本框来执行此操作: function process_input(){
你好,我很难理解 P、NP 和多项式时间缩减的主题。我试过在网上搜索它并问过我的一些 friend ,但我没有得到任何好的答案。 我想问一个关于这个话题的一般性问题: 设 A,B 为 P 中的语言(或
你好,我一直在研究 https://leetcode.com/problems/2-keys-keyboard/并想到了这个动态规划问题。 您从空白页上的“A”开始,完成后得到一个数字 n,页面上应该
我正在使用 Cocoapods 和 KIF 在 Xcode 服务器上运行持续集成。我已经成功地为一个项目设置了它来报告每次提交。我现在正在使用第二个项目并收到错误: Bot Issue: warnin
我是一名优秀的程序员,十分优秀!