- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中com.twitter.distributedlog.lock.ZKSessionLock
类的一些代码示例,展示了ZKSessionLock
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZKSessionLock
类的具体详情如下:
包路径:com.twitter.distributedlog.lock.ZKSessionLock
类名称:ZKSessionLock
[英]A lock under a given zookeeper session. This is a one-time lock. It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, it would be transitioned to expired or closed state. The Locking Procedure is described as below.
+-----------------+
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
Metrics
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
[中]给定zookeeper会话下的锁。这是一次性锁。它不可重复使用:如果锁定失败,如果zookeeper会话过期,如果调用#unlock,它将转换为过期或关闭状态。锁定程序如下所述。
0.如果是即时锁,它会首先得到锁服务人员。如果锁已经被某人持有。如果使用com,它将立即失败。啁啾分布式日志。例外。OwnershipAcquireFailedException与当前所有者。如果没有锁侍者,它将从1开始锁程序。1.准备:创建一个顺序znode来识别锁。2.检查锁侍者:准备好后让所有锁侍者检查。如果是第一个侍者,要求所有权;如果不是第一个服务员,但第一个服务员本身(相同的客户id和相同的会话id)也拥有所有权;否则,它会让观察者监视它的兄弟,并等待它消失。
+-----------------+
| INIT | ------------------------------+
+--------+--------+ |
| |
| |
+--------v--------+ |
| PREPARING |----------------------------+ |
+--------+--------+ | |
| | |
| | |
+--------v--------+ | |
+-------------| PREPARED |--------------+ | |
| +-----^---------+-+ | | |
| | | | | | |
| | | | | | |
| | | | | | |
+------V-----------+ | | | +--------v----------+ | |
| WAITING |-------+ | | | CLAIMED | | |
+------+-----+-----+ | | +--+----------+-----+ | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | +-v----------v----+ | | |
| +-------------------->| EXPIRED | | | |
| | +--+--------------+ | | |
| | | | | |
| | | | | |
| +--------V-------V-+ | | |
+------------>| CLOSING |
Metrics
tryAcquire: opstats. latency spent on try locking operations. it includes timeouts.
tryTimeouts: counter. the number of timeouts on try locking operations
unlock: opstats. latency spent on unlock operations.
代码示例来源:origin: twitter/distributedlog
private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
executeLockAction(lockEpoch, new LockAction() {
@Override
public void execute() {
// The lock is either expired or closed
if (!lockState.inState(State.WAITING)) {
LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
new Object[] { lockId, event.getPath(), lockState.getState() });
return;
}
lockState.transition(State.PREPARED);
// we don't need to wait and check the result, since:
// 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
// 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
checkLockOwnerAndWaitIfPossible(watcher, true);
}
@Override
public String getActionName() {
return "handleNodeDelete(path=" + event.getPath() + ")";
}
});
}
代码示例来源:origin: twitter/distributedlog
public int compare(String o1, String o2) {
int l1 = parseMemberID(o1);
int l2 = parseMemberID(o2);
return l1 - l2;
}
};
代码示例来源:origin: twitter/distributedlog
@Override
public void tryLock(long timeout, TimeUnit unit) throws LockingException {
final Stopwatch stopwatch = Stopwatch.createStarted();
Future<LockWaiter> tryFuture = asyncTryLock(timeout, unit);
LockWaiter waiter = waitForTry(stopwatch, tryFuture);
boolean acquired = waiter.waitForAcquireQuietly();
if (!acquired) {
throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner());
}
}
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
System.currentTimeMillis();
String clientId = "client-id";
ZooKeeper zk = zkc.get();
createLockPath(zk, lockPath);
final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
// lock0 lock
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
// simulate lock0 expires but znode still exists
final DistributedLockContext context1 = new DistributedLockContext();
context1.addLockId(lock0.getLockId());
final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context1);
lock1.tryLock(0L, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock1.getLockState());
lock1.unlock();
final DistributedLockContext context2 = new DistributedLockContext();
context2.addLockId(lock0.getLockId());
final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
60000, NullStatsLogger.INSTANCE, context2);
lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock2.getLockState());
lock2.unlock();
lock0.unlock();
}
代码示例来源:origin: twitter/distributedlog
assertTrue(lock0.haveLock());
assertFalse(lock1.haveLock());
assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
fail("Should fail check write lock since lock is already held by other people");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId().getLeft(),
oafe.getCurrentOwner());
assertFalse(lock0.haveLock());
assertTrue(lock1.haveLock());
代码示例来源:origin: twitter/distributedlog
String clientId = "test-lock-on-non-existed-lock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a non-existed lock.");
} catch (LockingException le) {
assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
assertEquals(State.CLOSED, lock.getLockState());
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on locking a failure lock.");
} catch (LockStateChangedException lsce) {
assertEquals(State.CLOSED, lock.getLockState());
代码示例来源:origin: twitter/distributedlog
throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe);
} catch (Exception ex) {
String message = getLockId() + " failed to lock " + lockPath;
throw new LockingException(lockPath, message, ex);
} finally {
unlock();
代码示例来源:origin: twitter/distributedlog
new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
@Override
public void execute() {
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
@Override
public void execute() {
代码示例来源:origin: twitter/distributedlog
final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
.setLockListener(listener);
expiredLatch.await();
assertEquals(State.INIT, lock.getLockState());
try {
lock.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("Should fail locking using an expired lock");
} catch (LockingException le) {
assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
代码示例来源:origin: twitter/distributedlog
@Test(timeout = 60000)
public void testParseClientID() throws Exception {
ZooKeeper zk = zkc.get();
String lockPath = "/test-parse-clientid";
String clientId = "test-parse-clientid-" + System.currentTimeMillis();
Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());
createLockPath(zk, lockPath);
// Correct data
String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
// Bad Lock Node Name
String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
// Malformed Node Name
String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
}
代码示例来源:origin: twitter/distributedlog
@Override
public Future<BoxedUnit> asyncUnlock() {
return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState()));
}
代码示例来源:origin: twitter/distributedlog
private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
final boolean wait) {
final Promise<String> promise = new Promise<String>();
checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
return promise;
}
代码示例来源:origin: twitter/distributedlog
final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
asyncTryLock(wait, result);
} else {
代码示例来源:origin: twitter/distributedlog
asyncTryLockWithoutCleanup(wait, lockResult);
代码示例来源:origin: twitter/distributedlog
/**
* Test lock after unlock is called.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testLockAfterUnlock() throws Exception {
String lockPath = "/test-lock-after-unlock";
String clientId = "test-lock-after-unlock";
ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
lock.unlock();
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(0, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock since lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
}
assertEquals(State.CLOSED, lock.getLockState());
try {
lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
fail("Should fail on tryLock immediately if lock state has changed.");
} catch (LockStateChangedException lsce) {
// expected
}
assertEquals(State.CLOSED, lock.getLockState());
}
代码示例来源:origin: twitter/distributedlog
FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
Pair<String, Long> lockId0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertFalse("New lock should be created under different session", lockId0_1.equals(lockId0_2));
assertTrue(lock0.haveLock());
assertEquals(lockId0_2,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
代码示例来源:origin: twitter/distributedlog
/**
* Test try-create after close race condition.
*
* @throws Exception
*/
@Test(timeout = 60000)
public void testTryCloseRaceCondition() throws Exception {
String name = testNames.getMethodName();
String lockPath = "/" + name;
String clientId = name;
createLockPath(zkc.get(), lockPath);
ZKSessionLock lock = new ZKSessionLock(
zkc, lockPath, clientId, lockStateExecutor,
1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
new DistributedLockContext());
try {
FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition,
FailpointUtils.DEFAULT_ACTION);
lock.tryLock(0, TimeUnit.MILLISECONDS);
} catch (LockClosedException ex) {
;
} finally {
FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
}
assertEquals(State.CLOSED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(0, children.size());
}
代码示例来源:origin: twitter/distributedlog
@Override
public void unlock() {
Future<BoxedUnit> unlockResult = asyncUnlock();
try {
Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
} catch (TimeoutException toe) {
// This shouldn't happen unless we lose a watch, and may result in a leaked lock.
LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
} catch (Exception e) {
LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });
}
}
代码示例来源:origin: twitter/distributedlog
ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
fail("lock1 should fail on locking since lock0 is holding the lock.");
} catch (OwnershipAcquireFailedException oafe) {
assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock0.unlock();
assertEquals(State.CLOSED, lock0.getLockState());
assertEquals(0, getLockWaiters(zkc, lockPath).size());
ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
lock2.tryLock(timeout, TimeUnit.MILLISECONDS);
代码示例来源:origin: twitter/distributedlog
FutureUtils.result(lock0.asyncAcquire());
Pair<String, Long> lockId0_1 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertTrue(lock0.haveLock());
assertEquals(lockId0_1,
Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(1, children.size());
assertTrue(lock0.haveLock());
Pair<String, Long> lock0_2 = ((ZKSessionLock) lock0.getInternalLock()).getLockId();
assertEquals(lock0_2,
Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(clientId, lock0_2.getLeft());
assertFalse(lockId0_1.equals(lock0_2));
我尝试理解[c代码 -> 汇编]代码 void node::Check( data & _data1, vector& _data2) { -> push ebp -> mov ebp,esp ->
我需要在当前表单(代码)的上下文中运行文本文件中的代码。其中一项要求是让代码创建新控件并将其添加到当前窗体。 例如,在Form1.cs中: using System.Windows.Forms; ..
我有此 C++ 代码并将其转换为 C# (.net Framework 4) 代码。有没有人给我一些关于 malloc、free 和 sprintf 方法的提示? int monate = ee; d
我的网络服务器代码有问题 #include #include #include #include #include #include #include int
给定以下 html 代码,将列表中的第三个元素(即“美丽”一词)以斜体显示的 CSS 代码是什么?当然,我可以给这个元素一个 id 或一个 class,但 html 代码必须保持不变。谢谢
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
我试图制作一个宏来避免重复代码和注释。 我试过这个: #define GrowOnPage(any Page, any Component) Component.Width := Page.Surfa
我正在尝试将我的旧 C++ 代码“翻译”成头条新闻所暗示的 C# 代码。问题是我是 C# 中的新手,并不是所有的东西都像 C++ 中那样。在 C++ 中这些解决方案运行良好,但在 C# 中只是不能。我
在 Windows 10 上工作,R 语言的格式化程序似乎没有在 Visual Studio Code 中完成它的工作。我试过R support for Visual Studio Code和 R-T
我正在处理一些报告(计数),我必须获取不同参数的计数。非常简单但乏味。 一个参数的示例查询: qCountsEmployee = ( "select count(*) from %s wher
最近几天我尝试从 d00m 调试网络错误。我开始用尽想法/线索,我希望其他 SO 用户拥有可能有用的宝贵经验。我希望能够提供所有相关信息,但我个人无法控制服务器环境。 整个事情始于用户注意到我们应用程
我有一个 app.js 文件,其中包含如下 dojo amd 模式代码: require(["dojo/dom", ..], function(dom){ dom.byId('someId').i
我对“-gencode”语句中的“code=sm_X”选项有点困惑。 一个例子:NVCC 编译器选项有什么作用 -gencode arch=compute_13,code=sm_13 嵌入库中? 只有
我为我的表格使用 X-editable 框架。 但是我有一些问题。 $(document).ready(function() { $('.access').editable({
我一直在通过本教程学习 flask/python http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-i-hello-wo
我想将 Vim 和 EMACS 用于 CNC、G 代码和 M 代码。 Vim 或 EMACS 是否有任何语法或模式来处理这种类型的代码? 最佳答案 一些快速搜索使我找到了 this vim 和 thi
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 想改进这个问题?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve this
这个问题在这里已经有了答案: Enabling markdown highlighting in Vim (5 个回答) 6年前关闭。 当我在 Vim 中编辑包含 Markdown 代码的 READM
我正在 Swift3 iOS 中开发视频应用程序。基本上我必须将视频 Assets 和音频与淡入淡出效果合并为一个并将其保存到 iPhone 画廊。为此,我使用以下方法: private func d
pipeline { agent any stages { stage('Build') { steps { e
我是一名优秀的程序员,十分优秀!