任意键的锁定处理程序

Lock handler for arbitrary keys

我有为任意键实现 "lock handler" 的代码。给定一个 key,它确保一次只有一个线程可以 process 那个(或等于)键(这里意味着调用 externalSystem.process(key) 调用)。

到目前为止,我有这样的代码:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

我明白这段代码可以导致OutOfMemoryError因为没有人清楚图。

我在考虑如何制作地图,它会累积有限数量的元素。当超过限制时,我们应该用新的替换最旧的访问元素(此代码应与最旧的元素同步作为监视器)。但我不知道如何进行回调,告诉我超出限制。

请分享您的想法。

P.S.

我重新阅读任务,现在我发现我有限制,即 handle 方法不能被调用超过 8 个线程。我不知道它对我有什么帮助,但我只是提到了它。

P.S.2

@Boris the Spider 提出了简单的解决方案:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

但是在 Boris 注意到我们的代码不是线程安全的因为它破坏了行为之后:
让我们研究 3 个使用相同键调用的线程:

  1. 线程#1 获取锁,现在在 map.remove(key);
  2. 之前
  3. 线程#2 使用等号键调用,因此它在线程#1 释放锁时等待。
  4. 然后线程#1 执行 map.remove(key);。在线程#3 调用方法 handle 之后。它检查映射中是否不存在此键的锁,因此它创建新锁并获取它。
  5. 线程#1 释放了锁,因此线程#2 获得了它。
    因此线程#2 和线程#3 可以为等号键并行调用。但不应该被允许。

为了避免这种情况,在清除map之前,我们应该阻塞任何线程获取锁,而waitset中的所有线程都没有获取和释放锁。看起来需要足够复杂的同步,它会导致算法运行缓慢。当地图大小超过某个限制值时,也许我们应该不时地清除地图。

我浪费了很多时间,但不幸的是我不知道如何实现。

当您调用

时将添加新值
lockMap.computeIfAbsent()

因此您可以检查 lockMap.size() 以了解项目数量。

但是您将如何找到第一个添加的项目?最好在使用后删除项目。

您可以使用存储对象引用的进程内缓存,例如 Caffeine、Guava、EHCache 或 cache2k。这是一个如何使用 cache2k:

构建缓存的示例
final Cache<Key, Lock> locks =
  new Cache2kBuilder<Key, Lock>(){}
    .loader(
      new CacheLoader<Key, Lock>() {
        @Override
        public Lock load(Key o) {
          return new ReentrantLock();
        }
      }
    )
    .storeByReference(true)
    .entryCapacity(1000)
    .build();

使用模式如你所问:

    Lock keyLock = locks.get(key);
    keyLock.lock();
    try {
        externalSystem.process(key);
    } finally {
        keyLock.unlock();
    }

由于缓存限制为 1000 个条目,因此会自动清除不再使用的锁。

如果应用程序中的容量和线程数不匹配,缓存可能会逐出正在使用的锁。该解决方案在我们的应用程序中完美运行多年。当有足够长的 运行 任务并且超出容量时,缓存将驱逐正在使用的锁。在实际应用程序中,您始终控制生命线程的数量,例如在 Web 容器中,您会将处理线程的数量限制为(示例)100。因此您知道使用的锁永远不会超过 100 个。如果考虑到这一点,此解决方案的开销最小。

请记住,锁定仅在您的应用程序在单个 VM 上运行时有效。您可能想看看分布式锁管理器 (DLM)。提供分布式锁的产品示例:hazelcast、infinispan、teracotta、redis/redisson.

谢谢Ben Mane
我发现了这个变体。

public class MyHandler {
    private final int THREAD_COUNT = 8;
    private final int K = 100;
    private final Striped<Lock> striped = Striped.lazyWeakLock(THREAD_COUNT * K);
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = striped.get(key);

        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }       
    }
}

您不需要尝试将大小限制为某个任意值 - 事实证明,您可以完成这种 "lock handler" 习语,同时只存储 exactly 当前锁定在地图中的键数。

这个想法是使用一个简单的约定:成功添加映射到地图算作"lock"操作,删除它算作[=51] =] 操作。这巧妙地避免了在某些线程仍将其锁定和其他竞争条件时删除映射的问题。

此时mapping中的value只是用来阻塞其他同key到达的线程,需要等到mapping被移除

这是一个示例1,其中 CountDownLatch 而不是 Lock 作为映射值:

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

在这里,映射的生命周期仅在持有锁时有效。地图的条目永远不会超过对不同键的并发请求。

您的方法的不同之处在于映射不是 "re-used" - 每个 handle 调用都会创建一个新的锁存器和映射。由于您已经在执行昂贵的原子操作,因此这在实践中不太可能会减慢速度。另一个缺点是,有很多等待线程,all 在闩锁倒计时时被唤醒,但只有一个会成功放入新映射并因此获得锁 - 其余的返回睡在新锁上。

可以构建另一个版本,其中re-uses线程出现时的映射并等待现有映射。基本上,解锁线程只是对其中一个等待线程执行 "handoff"。只有一个映射将用于等待同一键的一整套线程——它按顺序传递给每个线程。大小仍然有限制,因为没有更多线程在等待给定的映射,它仍然被删除。

为了实现它,您将 CountDownLatch 替换为可以计算等待线程数的映射值。当一个线程进行解锁时,它首先检查是否有任何线程在等待,如果有则唤醒一个线程进行切换。如果没有线程在等待,它 "destroys" 对象(即设置对象不再在映射中的标志)并将其从映射中删除。

你需要在适当的锁定下进行上述操作,并且有一些棘手的细节。在实践中,我发现上面的简短示例效果很好。


1 即时编写,未编译也未测试,但这个想法可行。

一种方法是完全放弃并发哈希映射,仅使用带锁定的常规 HashMap 来执行所需的映射操作并自动锁定状态。

乍一看,这似乎降低了系统的并发性,但如果我们假设 process(key) 调用相对于非常快速的锁操作来说是冗长的,那么它工作得很好,因为 process()仍然 运行 同时调用。独占临界区中只发生少量且固定的工作量。

这是一个草图:

public class MyHandler {

    private static class LockHolder {
        ReentrantLock lock = new ReentrantLock();
        int refcount = 0;
        void lock(){
            lock.lock();
        }
    } 

    private final SomeWorkExecutor someWorkExecutor;
    private final Lock mapLock = new ReentrantLock();
    private final HashMap<Key, LockHolder> lockMap = new HashMap<>();

    public void handle(Key key) {

        // lock the map
        mapLock.lock();
        LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder());
        // the lock in holder is either unlocked (newly created by us), or an existing lock, let's increment refcount
        holder.refcount++;
        mapLock.unlock();

        holder.lock();

        try {
            someWorkExecutor.process(key);
        } finally {
            mapLock.lock()
            keyLock.unlock();
            if (--holder.refcount == 0) {
              // no more users, remove lock holder
              map.remove(key);
            }
            mapLock.unlock();
        }
    }
}

我们使用refcount,它只在共享mapLock下操作,以跟踪有多少锁的用户。只要引用计数为零,我们就可以在退出处理程序时删除该条目。这种方法很好,因为如果 process() 调用与锁定开销相比相对昂贵,则它很容易推理并且会执行良好。由于地图操作发生在共享锁下,因此添加额外的逻辑也很简单,例如,在地图中保留一些 Holder 对象,跟踪统计信息等。

这是一个简短而有趣的版本,它利用 weak version of Guava's Interner class 来为每个键提供一个 "canonical" 对象以用作锁,并且实施弱引用语义,以便清理未使用的条目。

public class InternerHandler {
    private final Interner = Interners.newWeakInterner();

    public void handle(Key key) throws InterruptedException {
        Key canonKey = Interner.intern(key);
        synchronized (canonKey) {
            someWorkExecutor.process(key);
        }       
    }
}

基本上我们要求 canonical canonKeyequal()key,然后锁定这个 canonKey .每个人都会就规范密钥达成一致,因此所有传递相同密钥的调用者都会就要锁定的对象达成一致。

Interner 的弱特性意味着只要不使用规范密钥,就可以删除条目,这样就可以避免在 Interner 中积累条目。稍后,如果再次出现相同的键,则会选择一个新的规范条目。

上面的简单代码依赖于 built-in 监视器到 synchronize - 但如果这对您不起作用(例如,它已经用于其他目的),您可以包含一个锁定对象在 Key class 或创建一个 holder 对象。

class MyHandler {
    private final Map<Key, Lock> lockMap = Collections.synchronizedMap(new WeakHashMap<>());
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); 
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

您可以依靠方法 compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) 来同步对给定键的方法 process 的调用,您甚至不需要再使用 Lock 作为地图的价值,因为您不再依赖它了。

这个想法是依靠你的 ConcurrentHashMap 的内部锁定机制来执行你的方法,这将允许线程并行执行 process 键的方法,其对应的散列不是部分同一个箱子。这等同于基于条带锁的方法,只是您不需要额外的第三方库。

条带锁的方法很有趣,因为它在内存占用方面非常轻,因为您只需要有限数量的锁就可以做到这一点,因此您的锁所需的内存占用是已知的并且永远不会改变,这不是为每个键使用一个锁的方法(如您的问题),因此通常 better/recommended 使用基于条带锁的方法来满足这种需求。

所以你的代码可能是这样的:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

注意 1: 和我们一样 returns null 地图将永远是空的,因此您永远不会 运行 内存不足因为这张地图。

注意 2: 由于我们从不影响给定键的值,请注意也可以使用方法 computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction):

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

注意 3: 确保你的方法 process 永远不会为任何键调用方法 handle 因为你最终会无限循环(相同键)或死锁(其他非有序键,例如:如果一个线程调用 handle(key1) 然后 process 在内部调用 handle(key2) 而另一个线程并行调用 handle(key2) 然后 process 内部调用 handle(key1),无论使用何种方法,你都会遇到死锁)。此行为并非特定于此方法,任何方法都会发生。

每次为 key 创建和删除锁对象是一项性能开销很大的操作。当您从并发映射(比如缓存)执行 add/remove 锁定时,必须确保缓存中的 putting/removing 对象本身就是 thread-safe。所以这似乎不是个好主意,但可以通过 ConcurrentHashMap

来实现

条带锁定方法(内部并发哈希映射也使用)是更好的方法。从Google Guava docs开始解释为

When you want to associate a lock with an object, the key guarantee you need is that if key1.equals(key2), then the lock associated with key1 is the same as the lock associated with key2.

The crudest way to do this is to associate every key with the same lock, which results in the coarsest synchronization possible. On the other hand, you can associate every distinct key with a different lock, but this requires linear memory consumption and concurrency management for the system of locks itself, as new keys are discovered.

Striped allows the programmer to select a number of locks, which are distributed between keys based on their hash code. This allows the programmer to dynamically select a tradeoff between concurrency and memory consumption, while retaining the key invariant that if key1.equals(key2), then striped.get(key1) == striped.get(key2)

代码:

//declare globally; e.g. class field level
Striped<Lock> rwLockStripes = Striped.lock(16);

    Lock lock = rwLockStripes.get("key");
    lock.lock();
    try {
        // do you work here
    } finally {
        lock.unlock();
    }

以下代码片段有助于实现 putting/removal 锁定。

private ConcurrentHashMap<String, ReentrantLock> caches = new ConcurrentHashMap<>();

public void processWithLock(String key) {
    ReentrantLock lock = findAndGetLock(key);
    lock.lock();
    try {
        // do you work here

    } finally {
        unlockAndClear(key, lock);
    }
}

private void unlockAndClear(String key, ReentrantLock lock) {
    // *** Step 1: Release the lock.
    lock.unlock();
    // *** Step 2: Attempt to remove the lock
    // This is done by calling compute method, if given lock is present in
    // cache. if current lock object in cache is same instance as 'lock'
    // then remove it from cache. If not, some other thread is succeeded in
    // putting new lock object and hence we can leave the removal of lock object to that
    // thread.
    caches.computeIfPresent(key, (k, current) -> lock == current ? null : current);

}

private ReentrantLock findAndGetLock(String key) {
    // Merge method given us the access to the previously( if available) and
    // newer lock object together.
    return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer);
}

与其自己编写,不如尝试 JKeyLockManager。来自项目描述:

JKeyLockManager provides fine-grained locking with application specific keys.

现场给出的示例代码:

public class WeatherServiceProxy {
  private final KeyLockManager lockManager = KeyLockManagers.newManager();

  public void updateWeatherData(String cityName, float temperature) {
    lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); 
  }