Apache Curator 双重锁定问题与多个服务

Apache Curator double Locking issue with multiple services

我目前正在使用 Apache Curator 来外部化共享资源(数据库中的一行)的锁定。 总结一下这个问题, 我正在 运行 宁 2 个服务实例(使用 Spring 引导),让我们调用此服务 A,并让我们调用部署在不同区域的实例 A1 和 A2。 我在代表文件的共享数据库上锁定 table 的 ID(主键)。

在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个 运行ning 实例,它们每个都包含一个用于处理锁定的单例。我使用的方法是 Shared Reentrant Lock,它使用了 InterProcessMutex class,但是从来没有使用可重入锁的情况。是描述最接近我需要的class。

运行s的主进程是一个@Scheduled进程,执行之间有30秒的延迟。 此外,我为 ThreadPoolTask​​Scheduler 创建了一个 bean,它将 UUID 附加到线程名称,池大小为 1。 这个 UUID 的原因是因为如果没有它,当 A1 和 A2 运行 同时存在时,它们都包含一个名为 "task-scheduler-1" 的线程。这最初引起了我的问题 带锁,因为 A1 可能拥有锁,然后在处理文件的同时,A2 请求锁,因为它们共享相同的名称,Curator returns 在 lock.acquire() 上为真,因此有两个实例拥有相同的锁。

当 运行宁一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到 Curator 为临时锁生成的 UUID。 当 运行 连接两个或多个实例时,进程有时会进入 A1 拥有锁的竞争条件,然后 运行 是一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:

[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:274)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.call(DeleteBuilderImpl.java:268)
    at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
    at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
    at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
    at com.myApp.lock.BaseLockService.lambda$unlockAllIDs[=10=](BaseLockService.java:143)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)

这是我复制这种情况的单元测试:

@Test
public void baseLockTest() {
    List<Lockable> filesToProcess = new ArrayList<>();

    //For now only 1 to limit complexity
    Lockable fileToLock = FileSource.builder()
            .id(1)
            .build();

    filesToProcess.add(fileToLock);

    Runnable task = () -> {
        log.info("ATTEMPT LOCK");
        Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);

        if (!lockedBatch.isEmpty()) {

            try {
                log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            log.info("ATTEMPT UNLOCK");
                lockService.unlockAll(lockedBatch);
        }
    };

    System.out.println("**********************************************************");

    //Simulate two Service instances of 1 thread
    int totalThreads = 2;
    ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);

    List<Future> locksProcessed = new ArrayList<>(totalThreads);
    for (int i = 0; i < 1000; i++) {
        locksProcessed.add(executorService.submit(task));
    }

    Future f;
    while(!locksProcessed.isEmpty()){
        Iterator<Future> iterator = locksProcessed.iterator();
        while(iterator.hasNext()){
            f = iterator.next();
            if(f.isDone()){
                iterator.remove();
            }
        }

    }

    System.out.println("ALL DONE!!!");
}

这是 BaseLockService 中的锁定和解锁方法:

    public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
    Set<LockableHandle> effectivelyLocked = new HashSet<>();
    Iterator<Lockable> desiredLockIterator = desiredLock.iterator();

    while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
        Lockable toLock = desiredLockIterator.next();
        String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
        InterProcessMutex lock = createMutex(lockPath);

        try {
            if (lock.acquire(0, TimeUnit.SECONDS)) {
                LockableHandle handle = new LockableHandle(toLock, lock);
                effectivelyLocked.add(handle);
                locks.put(handle.getId(), handle);
            } else {
                log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
                        toLock.getId(),
                        lockPath));
            }
        } catch (Exception e) {
            log.error("Cannot lock path " + lockPath, e);
        }
    }

    log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
            desiredLock.size(),
            effectivelyLocked.size()));

    return effectivelyLocked;
}

    public void unlock(final LockableHandle lockHandle) {
    boolean success = false;

    try {
        InterProcessMutex lock = lockHandle.getMutex();
        if (lock != null) {
            lock.release();
            client.delete()
                    .deletingChildrenIfNeeded()
                    .forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
            success = true;
        }
    } catch (Exception e) {
        log.error("Can't unlock lock #" + lockHandle, e);
    } finally {
        locks.remove(lockHandle.getId());
    }

    log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
            lockHandle.getId(),
            success));
}

这是服务实例化后调用的init()方法:

    public void init() {
    log.info("Stating initialization of the Lock Service");
    locks = new HashMap<>();
    client = createClient();
    client.start();

    try {
        client.blockUntilConnected();
        if (client.isZk34CompatibilityMode()) {
            log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
        }
    } catch (InterruptedException ie) {
        log.error("Cannot connect to ZooKeeper.", ie);
    }

    log.info("Completed initialization of the Lock Service");
}

我不确定缺少什么,但没有任何选择。 感谢任何 comments/suggestions

我在您发送的 Locking Issue Example 中发现了一些问题。这些可能是该示例特有的,但如果这些也在您的代码中,它将解释您遇到的问题。

  1. Maven POM 指定不正确。 Curator 需要知道它处于 ZK 3.4.x 兼容模式 - 执行此操作的方法是 described here。 TL;DR 将 Zookeeper 从 Curator 依赖项中排除,并向 Zookeeper 3 添加直接依赖项。4.x.
  2. BaseLockService 中的 locks 字段应该是 ConcurrentHashMap
  3. BaseLockService#unlock 正在尝试通过调用 client.delete()... 来清理锁定路径。这行不通。这种代码存在固有的竞争,这就是为什么 Curator 有 "Reaper" 类 以及为什么我将容器节点推入 Zookeeper 3.5.x 的原因。请注意,正是这行代码产生了 NoNode 异常,而不是 Curator 锁定代码。我建议您摆脱该代码,不要担心它或移至 Zookeeper 3。5.x.
  4. 我认为 BaseLockService 不应该保留 re-creating InterProcessMutex。它应该保留它们的地图或其他东西。

当我应用上面的 1-3 时,测试成功通过(我尝试了多次)。我已经打开了一个 PR on your test project 并进行了 3 次更改。