- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章zookeeper实现分布式锁由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
1、分布式锁介绍 。
分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性.
2、架构介绍 。
在介绍使用zookeeper实现分布式锁之前,首先看当前的系统架构图 。
。
解释: 左边的整个区域表示一个zookeeper集群,locker是zookeeper的一个持久节点,node_1、node_2、node_3是locker这个持久节点下面的临时顺序节点。client_1、client_2、client_n表示多个客户端,service表示需要互斥访问的共享资源.
3、分布式锁获取思路 。
1.获取分布式锁的总体思路 。
在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。客户端调用createnode方法在locker下创建临时顺序节点,然后调用getchildren(“locker”)来获取locker下面的所有子节点,注意此时不用设置任何watcher。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小,那么就认为该客户端获取到了锁。如果发现自己创建的节点并非locker所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时对其注册事件监听器。之后,让这个被关注的节点删除,则客户端的watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如皋是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。当前这个过程中还需要许多的逻辑判断.
2.获取分布式锁的核心算法流程 。
下面同个一个流程图来分析获取分布式锁的完整算法,如下:
解释:客户端a要获取分布式锁的时候首先到locker下创建一个临时顺序节点(node_n),然后立即获取locker下的所有(一级)子节点.
此时因为会有多个客户端同一时间争取锁,因此locker下的子节点数量就会大于1。对于顺序节点,特点是节点名称后面自动有一个数字编号,先创建的节点数字编号小于后创建的,因此可以将子节点按照节点名称后缀的数字顺序从小到大排序,这样排在第一位的就是最先创建的顺序节点,此时它就代表了最先争取到锁的客户端!此时判断最小的这个节点是否为客户端a之前创建出来的node_n,如果是则表示客户端a获取到了锁,如果不是则表示锁已经被其它客户端获取,因此客户端a要等待它释放锁,也就是等待获取到锁的那个客户端b把自己创建的那个节点删除.
此时就通过监听比node_n次小的那个顺序节点的删除事件来知道客户端b是否已经释放了锁,如果是,此时客户端a再次获取locker下的所有子节点,再次与自己创建的node_n节点对比,直到自己创建的node_n是locker的所有子节点中顺序号最小的,此时表示客户端a获取到了锁! 。
4、基于zookeeper的分布式锁的代码实现 。
1.定义分布式锁接口 。
定义的分布式锁接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
interface
distributedlock {
/**获取锁,如果没有得到就等待*/
public
void
acquire()
throws
exception;
/**
* 获取锁,直到超时
* @param time超时时间
* @param unit time参数的单位
* @return是否获取到锁
* @throws exception
*/
public
boolean
acquire (
long
time, timeunit unit)
throws
exception;
/**
* 释放锁
* @throws exception
*/
public
void
release()
throws
exception;
}
|
2.定义一个简单的互斥锁 。
定义一个互斥锁类,实现以上定义的锁接口,同时继承一个基类basedistributedlock,该基类主要用于与zookeeper交互,包含一个尝试获取锁的方法和一个释放锁。 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
/**锁接口的具体实现,主要借助于继承的父类basedistributedlock来实现的接口方法
* 该父类是基于zookeeper实现分布式锁的具体细节实现*/
public
class
simpledistributedlockmutex
extends
basedistributedlock
implements
distributedlock {
/*用于保存zookeeper中实现分布式锁的节点,如名称为locker:/locker,
*该节点应该是持久节点,在该节点下面创建临时顺序节点来实现分布式锁 */
private final string basepath;
/*锁名称前缀,locker下创建的顺序节点例如都以lock-开头,这样便于过滤无关节点
*这样创建后的节点类似:lock-00000001,lock-000000002*/
private staticfinal string lock_name ="lock-";
/*用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)*/
private string ourlockpath;
/**
* 用于获取锁资源,通过父类的获取锁方法来获取锁
* @param time获取锁的超时时间
* @param unit time的时间单位
* @return是否获取到锁
* @throws exception
*/
private boolean internallock (long time, timeunit unit) throws exception {
//如果ourlockpath不为空则认为获取到了锁,具体实现细节见attemptlock的实现
ourlockpath = attemptlock(time, unit);
return ourlockpath !=null;
}
/**
* 传入zookeeper客户端连接对象,和basepath
* @param client zookeeper客户端连接对象
* @param basepath basepath是一个持久节点
*/
public simpledistributedlockmutex(zkclientext client, string basepath){
/*调用父类的构造方法在zookeeper中创建basepath节点,并且为basepath节点子节点设置前缀
*同时保存basepath的引用给当前类属性*/
super(client,basepath,lock_name);
this.basepath = basepath;
}
/**获取锁,直到超时,超时后抛出异常*/
public void acquire() throws exception {
//-1表示不设置超时时间,超时由zookeeper决定
if (!internallock(-1,null)){
throw new ioexception("连接丢失!在路径:'"+basepath+"'下不能获取锁!");
}
}
/**
* 获取锁,带有超时时间
*/
public boolean acquire(long time, timeunit unit) throws exception {
return internallock(time, unit);
}
/**释放锁*/
public
void
release()
throws
exception {
releaselock(ourlockpath);
}
}
|
3. 分布式锁的实现细节 。
获取分布式锁的重点逻辑在于basedistributedlock,实现了基于zookeeper实现分布式锁的细节.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
public
class
basedistributedlock {
private
final
zkclientext client;
private
final
string path;
private
final
string basepath;
private
final
string lockname;
private
static
final
integer max_retry_count =
10
;
public
basedistributedlock(zkclientext client, string path, string lockname){
this
.client = client;
this
.basepath = path;
this
.path = path.concat(
"/"
).concat(lockname);
this
.lockname = lockname;
}
private
void
deleteourpath(string ourpath)
throws
exception{
client.delete(ourpath);
}
private
string createlocknode(zkclient client, string path)
throws
exception{
return
client.createephemeralsequential(path,
null
);
}
/**
* 获取锁的核心方法
* @param startmillis
* @param millistowait
* @param ourpath
* @return
* @throws exception
*/
private
boolean
waittolock(
long
startmillis,
long
millistowait, string ourpath)
throws
exception{
boolean
havethelock =
false
;
boolean
dodelete =
false
;
try
{
while
( !havethelock ) {
//该方法实现获取locker节点下的所有顺序节点,并且从小到大排序
list<string> children = getsortedchildren();
string sequencenodename = ourpath.substring(basepath.length()+
1
);
//计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int
ourindex = children.indexof(sequencenodename);
/*如果在getsortedchildren中没有找到之前创建的[临时]顺序节点,这表示可能由于网络闪断而导致
*zookeeper认为连接断开而删除了我们创建的节点,此时需要抛出异常,让上一级去处理
*上一级的做法是捕获该异常,并且执行重试指定的次数 见后面的 attemptlock方法 */
if ( ourindex<0 ){
throw new zknonodeexception("节点没有找到: " + sequencenodename);
}
//如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
//此时当前客户端需要等待其它客户端释放锁,
boolean isgetthelock = ourindex == 0;
//如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
string pathtowatch = isgetthelock ? null : children.get(ourindex - 1);
if ( isgetthelock ){
havethelock = true;
}else{
//如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用countdownlatch来实现等待
string previoussequencepath = basepath .concat( "/" ) .concat( pathtowatch );
final countdownlatch latch = new countdownlatch(1);
final izkdatalistener previouslistener = new izkdatalistener() {
//次小节点删除事件发生时,让countdownlatch结束等待
//此时还需要重新让程序回到while,重新判断一次!
public void handledatadeleted(string datapath) throws exception {
latch.countdown();
}
public void handledatachange(string datapath, object data) throws exception {
// ignore
}
};
try{
//如果节点不存在会出现异常
client.subscribedatachanges(previoussequencepath, previouslistener);
if ( millistowait != null ){
millistowait -= (system.currenttimemillis() - startmillis);
startmillis = system.currenttimemillis();
if ( millistowait <= 0 ){
dodelete = true; // timed out - delete our node
break;
}
latch.await(millistowait, timeunit.microseconds);
}else{
latch.await();
}
}catch ( zknonodeexception e ){
//ignore
}finally{
client.unsubscribedatachanges(previoussequencepath, previouslistener);
}
}
}
}catch ( exception e ){
//发生异常需要删除节点
dodelete = true;
throw e;
}finally{
//如果需要删除节点
if ( dodelete ){
deleteourpath(ourpath);
}
}
return havethelock;
}
private string getlocknodenumber(string str, string lockname) {
int index = str.lastindexof(lockname);
if ( index >= 0 ){
index += lockname.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
private list<string> getsortedchildren() throws exception {
try{
list<string> children = client.getchildren(basepath);
collections.sort(
children,
new comparator<string>(){
public int compare(string lhs, string rhs){
return getlocknodenumber(lhs, lockname).compareto(getlocknodenumber(rhs, lockname));
}
}
);
return children;
}catch(zknonodeexception e){
client.createpersistent(basepath, true);
return getsortedchildren();
}
}
protected void releaselock(string lockpath) throws exception{
deleteourpath(lockpath);
}
protected string attemptlock(long time, timeunit unit) throws exception{
final long startmillis = system.currenttimemillis();
final long millistowait = (unit != null) ? unit.tomillis(time) : null;
string ourpath = null;
boolean hasthelock = false;
boolean isdone = false;
int retrycount = 0;
//网络闪断需要重试一试
while ( !isdone ){
isdone = true;
try{
//createlocknode用于在locker(basepath持久节点)下创建客户端要获取锁的[临时]顺序节点
ourpath = createlocknode(client, path);
/**
* 该方法用于判断自己是否获取到了锁,即自己创建的顺序节点在locker的所有子节点中是否最小
* 如果没有获取到锁,则等待其它客户端锁的释放,并且稍后重试直到获取到锁或者超时
*/
hasthelock = waittolock(startmillis, millistowait, ourpath);
}
catch
( zknonodeexception e ){
if
( retrycount++ < max_retry_count ){
isdone =
false
;
}
else
{
throw
e;
}
}
}
if
( hasthelock ){
return
ourpath;
}
return
null
;
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://blog.csdn.net/sunfeizhi/article/details/51926396 。
最后此篇关于zookeeper实现分布式锁的文章就讲到这里了,如果你想了解更多关于zookeeper实现分布式锁的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
背景: 我最近一直在使用 JPA,我为相当大的关系数据库项目生成持久层的轻松程度给我留下了深刻的印象。 我们公司使用大量非 SQL 数据库,特别是面向列的数据库。我对可能对这些数据库使用 JPA 有一
我已经在我的 maven pom 中添加了这些构建配置,因为我希望将 Apache Solr 依赖项与 Jar 捆绑在一起。否则我得到了 SolarServerException: ClassNotF
interface ITurtle { void Fight(); void EatPizza(); } interface ILeonardo : ITurtle {
我希望可用于 Java 的对象/关系映射 (ORM) 工具之一能够满足这些要求: 使用 JPA 或 native SQL 查询获取大量行并将其作为实体对象返回。 允许在行(实体)中进行迭代,并在对当前
好像没有,因为我有实现From for 的代码, 我可以转换 A到 B与 .into() , 但同样的事情不适用于 Vec .into()一个Vec . 要么我搞砸了阻止实现派生的事情,要么这不应该发
在 C# 中,如果 A 实现 IX 并且 B 继承自 A ,是否必然遵循 B 实现 IX?如果是,是因为 LSP 吗?之间有什么区别吗: 1. Interface IX; Class A : IX;
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在阅读标准haskell库的(^)的实现代码: (^) :: (Num a, Integral b) => a -> b -> a x0 ^ y0 | y0 a -> b ->a expo x0
我将把国际象棋游戏表示为 C++ 结构。我认为,最好的选择是树结构(因为在每个深度我们都有几个可能的移动)。 这是一个好的方法吗? struct TreeElement{ SomeMoveType
我正在为用户名数据库实现字符串匹配算法。我的方法采用现有的用户名数据库和用户想要的新用户名,然后检查用户名是否已被占用。如果采用该方法,则该方法应该返回带有数据库中未采用的数字的用户名。 例子: “贾
我正在尝试实现 Breadth-first search algorithm , 为了找到两个顶点之间的最短距离。我开发了一个 Queue 对象来保存和检索对象,并且我有一个二维数组来保存两个给定顶点
我目前正在 ika 中开发我的 Python 游戏,它使用 python 2.5 我决定为 AI 使用 A* 寻路。然而,我发现它对我的需要来说太慢了(3-4 个敌人可能会落后于游戏,但我想供应 4-
我正在寻找 Kademlia 的开源实现C/C++ 中的分布式哈希表。它必须是轻量级和跨平台的(win/linux/mac)。 它必须能够将信息发布到 DHT 并检索它。 最佳答案 OpenDHT是
我在一本书中读到这一行:-“当我们要求 C++ 实现运行程序时,它会通过调用此函数来实现。” 而且我想知道“C++ 实现”是什么意思或具体是什么。帮忙!? 最佳答案 “C++ 实现”是指编译器加上链接
我正在尝试使用分支定界的 C++ 实现这个背包问题。此网站上有一个 Java 版本:Implementing branch and bound for knapsack 我试图让我的 C++ 版本打印
在很多情况下,我需要在 C# 中访问合适的哈希算法,从重写 GetHashCode 到对数据执行快速比较/查找。 我发现 FNV 哈希是一种非常简单/好/快速的哈希算法。但是,我从未见过 C# 实现的
目录 LRU缓存替换策略 核心思想 不适用场景 算法基本实现 算法优化
1. 绪论 在前面文章中提到 空间直角坐标系相互转换 ,测绘坐标转换时,一般涉及到的情况是:两个直角坐标系的小角度转换。这个就是我们经常在测绘数据处理中,WGS-84坐标系、54北京坐标系
在软件开发过程中,有时候我们需要定时地检查数据库中的数据,并在发现新增数据时触发一个动作。为了实现这个需求,我们在 .Net 7 下进行一次简单的演示. PeriodicTimer .
二分查找 二分查找算法,说白了就是在有序的数组里面给予一个存在数组里面的值key,然后将其先和数组中间的比较,如果key大于中间值,进行下一次mid后面的比较,直到找到相等的,就可以得到它的位置。
我是一名优秀的程序员,十分优秀!