高吞吐量写入变量 Java 8 并发?

High Throughput writes on a variable in Java 8 Concurrency?

如果我在 Java 8 程序中有一个简单的 Integer,它可以被多个线程读写。

如果有人告诉我应用程序需要支持高吞吐量读取和极少的写入 - 答案很简单,我只使用读写锁。然后多个线程可以同时执行读取而不会阻塞 - 只有在不频繁的写入完成时才会发生阻塞。

但是如果我被告知应用程序需要支持高吞吐量写入(即共享变量被不同线程频繁更新)。无论我在这里使用哪种锁,据我所知,它总是会导致线程阻塞——因为当一个线程获得变量锁并更新它时,其余线程也会尝试更新变量将只需要等到他们获得锁定 - 这是正确的还是我在 Java 8 中遗漏了什么?

我可以在共享变量上编写某种异步更新方法,其中线程立即调用它 returns 的更新方法,我在幕后使用某种数据结构来将对共享变量的写入排队。至少这样我可以防止线程在尝试更新共享变量时阻塞。当然,这种方法会引发其他问题,比如线程是否应该假设它保证了写定义。成功还是我应该提供回调以通知更新成功等。除了类似的事情,我认为在 Java 8 中使用任何 Lock 进行高吞吐量写入时没有办法绕过阻塞? (或者即使在高吞吐量写入的情况下,我也应该只接受阻塞并只使用 Lock )。谢谢

严格来说 Integer - 您可以使用 LongAdder,它的实现似乎完全适合您的情况。如果你关心这里有一些额外的细节。

它在底层使用 CAS(比较和交换),很像 AtomicLong,但有一些不同。首先,它包含的实际 long value 被包裹在所谓的 Cell 中 - 基本上是 class 允许 cas (比较和交换) value 到一个新值,如果你愿意的话,就像 setter 一样。这个 Cell 也用 @sun.misc.Contended 注释,以防止虚假共享;这是对它的解释(来自代码注释):

But Atomic objects residing in arrays will tend to be placed adjacent to each other, and so will most often share cache lines (with a huge negative performance impact) without this precaution.

这里的实现非常有趣。让我们看看调用 add(long x) 方法时会发生什么:

 public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

思路是,如果Cell [] cs 为null,之前没有争用,意思是long value要么没有初始化,要么之前的所有CAS 所有线程的操作都已成功。在这种情况下,尝试将 CAS 的新值设为 long value - 如果可行,我们就完成了。如果失败,则会创建一个 Cell [] 数组,以便每个单独的线程尝试在它自己的 space 中工作,从而最大限度地减少争用。

下一句是你真正关心的,如果我正确理解了你的问题(这是我的,根本不是来自代码注释):

In simpler words: if there is no contention between threads, the work is done as if AtomicLong is used (sort of), otherwise try to create a separate space for each thread to work on.

如果你关心一些我发现有趣的额外细节:

Cell[]总是2的幂(很像HashMap内部数组);然后每个线程使用 ThreadLocalRandom 创建一些 hashCode 来尝试在数组 Cell [] cs 中找到一个条目来写入,或者甚至使用 Marsaglia XorShif 再次重新散列以尝试找到一个空闲槽在这个数组中;数组的大小限制为您拥有的内核数量(实际中最接近的 2 的幂),该数组可以调整大小,因此它可以增长并且所有这些操作都是使用 volatile int cellsBusy 自旋锁完成的。这段代码很棒,但如前所述,我没有完全理解。