Apache Curator 双重锁定问题与多个服务
Apache Curator double Locking issue with multiple services
我目前正在使用 Apache Curator 来外部化共享资源(数据库中的一行)的锁定。
总结一下这个问题,
我正在 运行 宁 2 个服务实例(使用 Spring 引导),让我们调用此服务 A,并让我们调用部署在不同区域的实例 A1 和 A2。
我在代表文件的共享数据库上锁定 table 的 ID(主键)。
在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个 运行ning 实例,它们每个都包含一个用于处理锁定的单例。我使用的方法是 Shared Reentrant Lock,它使用了 InterProcessMutex class,但是从来没有使用可重入锁的情况。是描述最接近我需要的class。
运行s的主进程是一个@Scheduled进程,执行之间有30秒的延迟。
此外,我为 ThreadPoolTaskScheduler 创建了一个 bean,它将 UUID 附加到线程名称,池大小为 1。
这个 UUID 的原因是因为如果没有它,当 A1 和 A2 运行 同时存在时,它们都包含一个名为 "task-scheduler-1" 的线程。这最初引起了我的问题
带锁,因为 A1 可能拥有锁,然后在处理文件的同时,A2 请求锁,因为它们共享相同的名称,Curator returns 在 lock.acquire() 上为真,因此有两个实例拥有相同的锁。
当 运行宁一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到 Curator 为临时锁生成的 UUID。
当 运行 连接两个或多个实例时,进程有时会进入 A1 拥有锁的竞争条件,然后 运行 是一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs[=10=](BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)
这是我复制这种情况的单元测试:
@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();
//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();
filesToProcess.add(fileToLock);
Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);
if (!lockedBatch.isEmpty()) {
try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};
System.out.println("**********************************************************");
//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}
Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}
}
System.out.println("ALL DONE!!!");
}
这是 BaseLockService 中的锁定和解锁方法:
public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();
while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);
try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}
log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));
return effectivelyLocked;
}
public void unlock(final LockableHandle lockHandle) {
boolean success = false;
try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}
log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}
这是服务实例化后调用的init()方法:
public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();
try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}
log.info("Completed initialization of the Lock Service");
}
- 我已经检查过连接问题,这不是问题。
- 在日志中未找到重新连接、丢失、暂停的消息。
- 锁超时不是问题,因为除非 session/connection 死亡,否则 ZooKeeper 不会使任何锁过期。
- 我试过其他 Curator 食谱,但它们不适合我的需要。他们也抛出类似的异常。
- Apache Curator 版本是 4.2.0,ZooKeeper 是 3.4.X
我不确定缺少什么,但没有任何选择。
感谢任何 comments/suggestions
我在您发送的 Locking Issue Example 中发现了一些问题。这些可能是该示例特有的,但如果这些也在您的代码中,它将解释您遇到的问题。
- Maven POM 指定不正确。 Curator 需要知道它处于 ZK 3.4.x 兼容模式 - 执行此操作的方法是 described here。 TL;DR 将 Zookeeper 从 Curator 依赖项中排除,并向 Zookeeper 3 添加直接依赖项。4.x.
BaseLockService
中的 locks
字段应该是 ConcurrentHashMap
BaseLockService#unlock
正在尝试通过调用 client.delete()...
来清理锁定路径。这行不通。这种代码存在固有的竞争,这就是为什么 Curator 有 "Reaper" 类 以及为什么我将容器节点推入 Zookeeper 3.5.x 的原因。请注意,正是这行代码产生了 NoNode
异常,而不是 Curator 锁定代码。我建议您摆脱该代码,不要担心它或移至 Zookeeper 3。5.x.
- 我认为
BaseLockService
不应该保留 re-creating InterProcessMutex
。它应该保留它们的地图或其他东西。
当我应用上面的 1-3 时,测试成功通过(我尝试了多次)。我已经打开了一个 PR on your test project 并进行了 3 次更改。
我目前正在使用 Apache Curator 来外部化共享资源(数据库中的一行)的锁定。 总结一下这个问题, 我正在 运行 宁 2 个服务实例(使用 Spring 引导),让我们调用此服务 A,并让我们调用部署在不同区域的实例 A1 和 A2。 我在代表文件的共享数据库上锁定 table 的 ID(主键)。
在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个 运行ning 实例,它们每个都包含一个用于处理锁定的单例。我使用的方法是 Shared Reentrant Lock,它使用了 InterProcessMutex class,但是从来没有使用可重入锁的情况。是描述最接近我需要的class。
运行s的主进程是一个@Scheduled进程,执行之间有30秒的延迟。 此外,我为 ThreadPoolTaskScheduler 创建了一个 bean,它将 UUID 附加到线程名称,池大小为 1。 这个 UUID 的原因是因为如果没有它,当 A1 和 A2 运行 同时存在时,它们都包含一个名为 "task-scheduler-1" 的线程。这最初引起了我的问题 带锁,因为 A1 可能拥有锁,然后在处理文件的同时,A2 请求锁,因为它们共享相同的名称,Curator returns 在 lock.acquire() 上为真,因此有两个实例拥有相同的锁。
当 运行宁一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到 Curator 为临时锁生成的 UUID。 当 运行 连接两个或多个实例时,进程有时会进入 A1 拥有锁的竞争条件,然后 运行 是一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs[=10=](BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)
这是我复制这种情况的单元测试:
@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();
//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();
filesToProcess.add(fileToLock);
Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);
if (!lockedBatch.isEmpty()) {
try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};
System.out.println("**********************************************************");
//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}
Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}
}
System.out.println("ALL DONE!!!");
}
这是 BaseLockService 中的锁定和解锁方法:
public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();
while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);
try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}
log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));
return effectivelyLocked;
}
public void unlock(final LockableHandle lockHandle) {
boolean success = false;
try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}
log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}
这是服务实例化后调用的init()方法:
public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();
try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}
log.info("Completed initialization of the Lock Service");
}
- 我已经检查过连接问题,这不是问题。
- 在日志中未找到重新连接、丢失、暂停的消息。
- 锁超时不是问题,因为除非 session/connection 死亡,否则 ZooKeeper 不会使任何锁过期。
- 我试过其他 Curator 食谱,但它们不适合我的需要。他们也抛出类似的异常。
- Apache Curator 版本是 4.2.0,ZooKeeper 是 3.4.X
我不确定缺少什么,但没有任何选择。 感谢任何 comments/suggestions
我在您发送的 Locking Issue Example 中发现了一些问题。这些可能是该示例特有的,但如果这些也在您的代码中,它将解释您遇到的问题。
- Maven POM 指定不正确。 Curator 需要知道它处于 ZK 3.4.x 兼容模式 - 执行此操作的方法是 described here。 TL;DR 将 Zookeeper 从 Curator 依赖项中排除,并向 Zookeeper 3 添加直接依赖项。4.x.
BaseLockService
中的locks
字段应该是ConcurrentHashMap
BaseLockService#unlock
正在尝试通过调用client.delete()...
来清理锁定路径。这行不通。这种代码存在固有的竞争,这就是为什么 Curator 有 "Reaper" 类 以及为什么我将容器节点推入 Zookeeper 3.5.x 的原因。请注意,正是这行代码产生了NoNode
异常,而不是 Curator 锁定代码。我建议您摆脱该代码,不要担心它或移至 Zookeeper 3。5.x.- 我认为
BaseLockService
不应该保留 re-creatingInterProcessMutex
。它应该保留它们的地图或其他东西。
当我应用上面的 1-3 时,测试成功通过(我尝试了多次)。我已经打开了一个 PR on your test project 并进行了 3 次更改。