gpt4 book ai didi

java - Apache Curator 双重锁定问题与多个服务

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:30:39 26 4
gpt4 key购买 nike

我目前正在使用 Apache Curator 来外部化共享资源(数据库中的一行)的锁定。总结一下这个问题,我正在运行一个服务的 2 个实例(使用 Spring Boot),我们将其称为服务 A,并调用部署在不同区域的实例 A1 和 A2。我锁定了共享数据库中代表文件的表的 ID(主键)。

在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个正在运行的实例,它们每个都包含一个用于处理锁定的单例。我使用的食谱是 Shared Reentrant Lock它使用的是 InterProcessMutex 类,但是从来没有使用可重入锁的情况。它的描述最接近我的需要。

运行的主进程是@Scheduled 进程,执行时间之间有 30 秒的延迟。此外,我为 ThreadPoolTask​​Scheduler 创建了一个 bean,它将 UUID 附加到线程名称,池大小为 1。这个 UUID 的原因是因为没有它,当 A1 和 A2 同时运行时,它们都包含一个名为“task-scheduler-1”的线程。这最初引起了我的问题使用锁定,因为 A1 可能拥有锁,然后在处理文件的同时,A2 请求锁,因为它们共享相同的名称,Curator 在 lock.acquire() 上返回 true,因此两个实例拥有相同的锁。

运行一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到 Curator 为临时锁生成的 UUID。当运行两个或多个实例时,进程有时会进入 A1 拥有锁的竞争状态,然后运行一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:

[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)

这是我复制这种情况的单元测试:

@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();

//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();

filesToProcess.add(fileToLock);

Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);

if (!lockedBatch.isEmpty()) {

try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};

System.out.println("**********************************************************");

//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);

List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}

Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}

}

System.out.println("ALL DONE!!!");
}

这是 BaseLockService 中的锁定和解锁方法:

    public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();

while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);

try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}

log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));

return effectivelyLocked;
}

public void unlock(final LockableHandle lockHandle) {
boolean success = false;

try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}

log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}

这是服务实例化后调用的init()方法:

    public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();

try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}

log.info("Completed initialization of the Lock Service");
}
  • 我已经检查过连接问题,这不是问题。
  • 在日志中没有找到 RECONNECTED、LOST、SUSPENDED 消息。
  • 锁超时不是问题,因为除非 session /连接终止,否则 ZooKeeper 不会使任何锁过期。
  • 我尝试过 Curator 的其他食谱,但它们不适合我的需要。无论如何,他们也会抛出类似的异常。
  • Apache Curator 版本为 4.2.0,ZooKeeper 为 3.4.X

我不确定缺少什么,但没有任何选择。感谢您的任何意见/建议

最佳答案

我在 Locking Issue Example 中发现了很多问题你发送。这些可能是该示例特有的,但如果这些也在您的代码中,它将解释您所看到的问题。

  1. Maven POM 指定不正确。 Curator 需要知道它处于 ZK 3.4.x 兼容模式 - 方法是 described here . TL;DR 将 Zookeeper 从 Curator 依赖项中排除,并添加对 Zookeeper 3.4.x 的直接依赖项。
  2. BaseLockService 中的locks 字段应该是一个ConcurrentHashMap
  3. BaseLockService#unlock 正在尝试通过调用 client.delete()... 来清理锁定路径。这行不通。这种代码存在固有的竞争,这就是 Curator 拥有“Reaper”类的原因,也是我将容器节点插入 Zookeeper 3.5.x 的原因。请注意,正是这行代码产生了 NoNode 异常,而不是 Curator 锁定代码。我建议您删除该代码,不要担心它或迁移到 Zookeeper 3.5.x。
  4. 我认为 BaseLockService 不应该继续重新创建 InterProcessMutex。它应该保留他们的 map 或其他东西。

当我应用上面的 1-3 时,测试成功通过(我尝试了多次)。我开了一个PR on your test project有 3 个变化。

关于java - Apache Curator 双重锁定问题与多个服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57014270/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com