并行无锁升序 id 生成

Parallel lock-free ascending id generation

我有一张地图,它应该将字符串与 id 相关联。 id 之间必须有间隔,并且它们必须是从 0 到 N 的唯一整数。

请求总是带有两个字符串,其中一个、两个或 none 可能已经被索引。 该地图是从 ForkJoin 池并行构建的,理想情况下我想避免显式同步块。我正在寻找一种最佳方式来最大化带锁或不带锁的吞吐量。

我不知道如何使用 AtomicInteger 而不为地图中已经存在的键按顺序创建间隙。

public class Foo {
    private final Map<String, Integer> idGenerator = new ConcurrentHashMap<>();

    // invoked from multiple threads
    public void update(String key1, String key2) {
      idGenerator.dosomething(key, ?) // should save the key and unique id
      idGenerator.dosomething(key2, ?) // should save the key2 and its unique id
      Bar bar = new Bar(idGenerator.get(key), idGenerator.get(key2));
      // ... do something with bar
   }
}

我认为 size() 方法结合 merge() 可能会解决问题,但我不能完全说服自己。谁能建议解决这个问题的方法?

编辑

关于重复标志,这无法按照链接答案中的建议使用 AtomicInteger.incrementAndGet() 解决。如果我对每个字符串盲目地这样做,那么序列中就会有 gaps。需要 compound 操作来检查密钥是否存在,然后才生成 id。 我正在寻找一种通过 Map API.

实现这种复合操作的方法

第二个提供的答案违背了我在问题中明确提出的要求。

没有办法完全按照您想要的方式进行操作 -- ConcurrentHashMap is not in and of itself, lock-free. However, you can do it atomically without having to do any explicit lock management by using the java.util.Map.computeIfAbsent 函数。

这是一个代码示例,其风格与您提供的内容相同,应该可以帮助您继续前进。

ConcurrentHashMap<String, Integer> keyMap = new ConcurrentHashMap<>();
AtomicInteger sequence = new AtomicInteger();

public void update(String key1, String key2) {
    Integer id1 = keyMap.computeIfAbsent(key1, s -> sequence.getAndIncrement());
    Integer id2 = keyMap.computeIfAbsent(key2, s -> sequence.getAndIncrement());

    Bar bar = new Bar(id1, id2);
    // ... do something with bar
}

我不确定您能否完全按照自己的意愿行事。不过,您可以批处理一些更新,或者与枚举/添加分开进行检查。

很多答案都假设顺序并不重要:您需要给所有字符串一个数字,但即使在一对中重新排序也可以,对吧?并发可能已经导致对的重新排序,或者对的成员不会获得连续的数字,但重新排序可能会导致对中的第一个获得更大的数字。

latency is not that important. This application should chew large amount of data and eventually produce output. Most of the time there should be a search hit in a map

如果大多数搜索命中,那么我们主要需要地图上的读取吞吐量。

单个编写器线程可能就足够了。

所以并发的readers可以检查它们的输入,而不是直接添加到主映射,如果不存在,将它们添加到要枚举的队列中并添加到主映射ConcurrentHashMap. 队列可以是一个简单的无锁队列,也可以是另一个 ConCurrentHashMap 来过滤掉尚未添加的候选队列中的重复项。但可能无锁队列是好的。

那么您就不需要原子计数器,或者当 2 个线程看到相同的字符串之前将计数器递增两次时,它们中的任何一个都可以将其添加到映射中,这有任何问题。 (否则这是个大问题。)

如果作者有办法锁定 ConcurrentHashMap 以提高批量更新的效率,那可能会很好。但是,如果预计命中率非常高,您真的希望其他 reader 线程在我们增长它时尽可能多地过滤重复项,而不是暂停它。


为了减少主要前端线程之间的争用,您可以有多个队列,比如每个线程可能有一个单生产者/单消费者队列,或者一组 4 个线程 运行一对物理内核共享一个队列。

枚举线程从所有这些线程中读取。

在 reader 不与写入者争用的队列中,枚举线程没有争用。但是多个队列减少了作者之间的争用。 (写入这些队列的线程是以只读方式访问主 ConcurrentHashMap 的线程,如果命中率高,大多数 CPU 时间将花费在其中。)


某种 read-copy-update (RCU) 数据结构可能很好,如果 Java 有 。它会让 readers 继续全速过滤掉重复项,同时枚举线程构造一个新的 table 并完成一批插入,在构建新的 table 时竞争为零.


以 90% 的命中率,一个编写器线程可能跟上 10 个左右的 reader 线程,这些线程根据主 table.

过滤新键

您可能想要设置一些队列大小限制以允许来自单个写入器线程的背压。或者,如果您拥有的内核/线程多于单个编写器可以跟上的速度,那么也许某种并发设置可以让多个线程在编号之前消除重复项是有帮助的。

或者说真的,如果你能等到最后给所有的东西都编号,我想那会简单得多。

我想过也许在竞争条件下尝试在有错误余地的情况下进行编号,然后回过头来解决问题,但这可能并不好。