gpt4 book ai didi

浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 33 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章浅谈Java(SpringBoot)基于zookeeper的分布式锁实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

通过zookeeper实现分布式锁 。

1、创建zookeeper的client 。

首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean {
  private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller. class );
 
  private string connectionstring;
  private int sessiontimeoutms;
  private int connectiontimeoutms;
  private retrypolicy retrypolicy;
  private curatorframework client;
 
  public curatorfactorybean(string connectionstring) {
   this (connectionstring, 500 , 500 );
  }
 
  public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) {
   this .connectionstring = connectionstring;
   this .sessiontimeoutms = sessiontimeoutms;
   this .connectiontimeoutms = connectiontimeoutms;
  }
 
  @override
  public void destroy() throws exception {
   logger.info( "closing curator framework..." );
   this .client.close();
   logger.info( "closed curator framework." );
  }
 
  @override
  public curatorframework getobject() throws exception {
   return this .client;
  }
 
  @override
  public class <?> getobjecttype() {
    return this .client != null ? this .client.getclass() : curatorframework. class ;
  }
 
  @override
  public boolean issingleton() {
   return true ;
  }
 
  @override
  public void afterpropertiesset() throws exception {
   if (stringutils.isempty( this .connectionstring)) {
    throw new illegalstateexception( "connectionstring can not be empty." );
   } else {
    if ( this .retrypolicy == null ) {
     this .retrypolicy = new exponentialbackoffretry( 1000 , 2147483647 , 180000 );
    }
 
    this .client = curatorframeworkfactory.newclient( this .connectionstring, this .sessiontimeoutms, this .connectiontimeoutms, this .retrypolicy);
    this .client.start();
    this .client.blockuntilconnected( 30 , timeunit.milliseconds);
   }
  }
  public void setconnectionstring(string connectionstring) {
   this .connectionstring = connectionstring;
  }
 
  public void setsessiontimeoutms( int sessiontimeoutms) {
   this .sessiontimeoutms = sessiontimeoutms;
  }
 
  public void setconnectiontimeoutms( int connectiontimeoutms) {
   this .connectiontimeoutms = connectiontimeoutms;
  }
 
  public void setretrypolicy(retrypolicy retrypolicy) {
   this .retrypolicy = retrypolicy;
  }
 
  public void setclient(curatorframework client) {
   this .client = client;
  }
}

2、封装分布式锁 。

根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁 。

?
1
2
3
public interprocessmutex(curatorframework client, string path) {
  this (client, path, new standardlockinternalsdriver());
}

使用 acquire方法 1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。 2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false.

?
1
2
3
4
5
6
7
8
9
public void acquire() throws exception {
  if (! this .internallock(-1l, (timeunit) null )) {
   throw new ioexception( "lost connection while trying to acquire lock: " + this .basepath);
  }
}
 
public boolean acquire( long time, timeunit unit) throws exception {
  return this .internallock(time, unit);
}

释放锁 mutex.release(),

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void release() throws exception {
   thread currentthread = thread.currentthread();
   interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata) this .threaddata.get(currentthread);
   if (lockdata == null ) {
    throw new illegalmonitorstateexception( "you do not own the lock: " + this .basepath);
   } else {
    int newlockcount = lockdata.lockcount.decrementandget();
    if (newlockcount <= 0 ) {
     if (newlockcount < 0 ) {
      throw new illegalmonitorstateexception( "lock count has gone negative for lock: " + this .basepath);
     } else {
      try {
       this .internals.releaselock(lockdata.lockpath);
      } finally {
       this .threaddata.remove(currentthread);
      }
 
     }
    }
   }
  }

封装后的dlock代码 1、调用interprocessmutex processmutex = dlock.mutex(path),

2、手动释放锁processmutex.release(),

3、需要手动删除路径dlock.del(path),

推荐 使用: 都是 函数式编程 在业务代码执行完毕后 会释放锁和删除path 1、这个有返回结果 public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit) 2、这个无返回结果 public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class dlock {
  private final logger logger;
  private static final long timeout_d = 100l;
  private static final string root_path_d = "/dlock" ;
  private string lockrootpath;
  private curatorframework client;
 
  public dlock(curatorframework client) {
   this ( "/dlock" , client);
  }
 
  public dlock(string lockrootpath, curatorframework client) {
   this .logger = loggerfactory.getlogger(dlock. class );
   this .lockrootpath = lockrootpath;
   this .client = client;
  }
  public interprocessmutex mutex(string path) {
   if (!stringutils.startswith(path, "/" )) {
    path = constant.keybuilder( new object[]{ "/" , path});
   }
 
   return new interprocessmutex( this .client, constant.keybuilder( new object[]{ this .lockrootpath, "" , path}));
  }
 
  public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception {
   return this .mutex(path, zklockcallback, 100l, timeunit.milliseconds);
  }
 
  public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception {
   string finalpath = this .getlockpath(path);
   interprocessmutex mutex = new interprocessmutex( this .client, finalpath);
 
   try {
    if (!mutex.acquire(time, timeunit)) {
     throw new zklockexception( "acquire zk lock return false" );
    }
   } catch (exception var13) {
    throw new zklockexception( "acquire zk lock failed." , var13);
   }
 
   t var8;
   try {
    var8 = zklockcallback.doinlock();
   } finally {
    this .releaselock(finalpath, mutex);
   }
 
   return var8;
  }
 
  private void releaselock(string finalpath, interprocessmutex mutex) {
   try {
    mutex.release();
    this .logger.info( "delete zk node path:{}" , finalpath);
    this .deleteinternal(finalpath);
   } catch (exception var4) {
    this .logger.error( "dlock" , "release lock failed, path:{}" , finalpath, var4);
//   logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4});
   }
 
  }
 
  public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception {
   string finalpath = this .getlockpath(path);
   interprocessmutex mutex = new interprocessmutex( this .client, finalpath);
 
   try {
    if (!mutex.acquire(time, timeunit)) {
     throw new zklockexception( "acquire zk lock return false" );
    }
   } catch (exception var13) {
    throw new zklockexception( "acquire zk lock failed." , var13);
   }
 
   try {
    zklockcallback.response();
   } finally {
    this .releaselock(finalpath, mutex);
   }
 
  }
 
  public string getlockpath(string custompath) {
   if (!stringutils.startswith(custompath, "/" )) {
    custompath = constant.keybuilder( new object[]{ "/" , custompath});
   }
 
   string finalpath = constant.keybuilder( new object[]{ this .lockrootpath, "" , custompath});
   return finalpath;
  }
 
  private void deleteinternal(string finalpath) {
   try {
    ((errorlistenerpathable) this .client.delete().inbackground()).forpath(finalpath);
   } catch (exception var3) {
    this .logger.info( "delete zk node path:{} failed" , finalpath);
   }
 
  }
 
  public void del(string custompath) {
   string lockpath = "" ;
 
   try {
    lockpath = this .getlockpath(custompath);
    ((errorlistenerpathable) this .client.delete().inbackground()).forpath(lockpath);
   } catch (exception var4) {
    this .logger.info( "delete zk node path:{} failed" , lockpath);
   }
 
  }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@functionalinterface
public interface zklockcallback<t> {
  t doinlock();
}
 
@functionalinterface
public interface zkvoidcallback {
  void response();
}
 
public class zklockexception extends exception {
  public zklockexception() {
  }
 
  public zklockexception(string message) {
   super (message);
  }
 
  public zklockexception(string message, throwable cause) {
   super (message, cause);
  }
}

配置curatorconfig 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@configuration
public class curatorconfig {
  @value ( "${zk.connectionstring}" )
  private string connectionstring;
 
  @value ( "${zk.sessiontimeoutms:500}" )
  private int sessiontimeoutms;
 
  @value ( "${zk.connectiontimeoutms:500}" )
  private int connectiontimeoutms;
 
  @value ( "${zk.dlockroot:/dlock}" )
  private string dlockroot;
 
  @bean
  public curatorfactorybean curatorfactorybean() {
   return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms);
  }
 
  @bean
  @autowired
  public dlock dlock(curatorframework client) {
   return new dlock(dlockroot, client);
  }
}

测试代码 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@restcontroller
@requestmapping ( "/dlock" )
public class lockcontroller {
 
  @autowired
  private dlock dlock;
 
  @requestmapping ( "/lock" )
  public map testdlock(string no){
   final string path = constant.keybuilder( "/test/no/" , no);
   long mutex=0l;
   try {
    system.out.println( "在拿锁:" +path+system.currenttimemillis());
     mutex = dlock.mutex(path, () -> {
     try {
      system.out.println( "拿到锁了" + system.currenttimemillis());
      thread.sleep( 10000 );
      system.out.println( "操作完成了" + system.currenttimemillis());
     } finally {
      return system.currenttimemillis();
     }
    }, 1000 , timeunit.milliseconds);
   } catch (zklockexception e) {
    system.out.println( "拿不到锁呀" +system.currenttimemillis());
   }
   return collections.singletonmap( "ret" ,mutex);
  }
 
  @requestmapping ( "/dlock" )
  public map testdlock1(string no){
   final string path = constant.keybuilder( "/test/no/" , no);
   long mutex=0l;
   try {
    system.out.println( "在拿锁:" +path+system.currenttimemillis());
    interprocessmutex processmutex = dlock.mutex(path);
    processmutex.acquire();
    system.out.println( "拿到锁了" + system.currenttimemillis());
    thread.sleep( 10000 );
    processmutex.release();
    system.out.println( "操作完成了" + system.currenttimemillis());
   } catch (zklockexception e) {
    system.out.println( "拿不到锁呀" +system.currenttimemillis());
    e.printstacktrace();
   } catch (exception e){
    e.printstacktrace();
   }
   return collections.singletonmap( "ret" ,mutex);
  }
  @requestmapping ( "/del" )
  public map deldlock(string no){
   final string path = constant.keybuilder( "/test/no/" , no);
   dlock.del(path);
   return collections.singletonmap( "ret" , 1 );
  }
}

以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我网站的支持! 。

原文链接:https://blog.csdn.net/LJY_SUPER/article/details/87807091 。

最后此篇关于浅谈Java(SpringBoot)基于zookeeper的分布式锁实现的文章就讲到这里了,如果你想了解更多关于浅谈Java(SpringBoot)基于zookeeper的分布式锁实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com