高吞吐量写入变量 Java 8 并发?
High Throughput writes on a variable in Java 8 Concurrency?
如果我在 Java 8 程序中有一个简单的 Integer,它可以被多个线程读写。
如果有人告诉我应用程序需要支持高吞吐量读取和极少的写入 - 答案很简单,我只使用读写锁。然后多个线程可以同时执行读取而不会阻塞 - 只有在不频繁的写入完成时才会发生阻塞。
但是如果我被告知应用程序需要支持高吞吐量写入(即共享变量被不同线程频繁更新)。无论我在这里使用哪种锁,据我所知,它总是会导致线程阻塞——因为当一个线程获得变量锁并更新它时,其余线程也会尝试更新变量将只需要等到他们获得锁定 - 这是正确的还是我在 Java 8 中遗漏了什么?
我可以在共享变量上编写某种异步更新方法,其中线程立即调用它 returns 的更新方法,我在幕后使用某种数据结构来将对共享变量的写入排队。至少这样我可以防止线程在尝试更新共享变量时阻塞。当然,这种方法会引发其他问题,比如线程是否应该假设它保证了写定义。成功还是我应该提供回调以通知更新成功等。除了类似的事情,我认为在 Java 8 中使用任何 Lock 进行高吞吐量写入时没有办法绕过阻塞? (或者即使在高吞吐量写入的情况下,我也应该只接受阻塞并只使用 Lock )。谢谢
严格来说 Integer
- 您可以使用 LongAdder
,它的实现似乎完全适合您的情况。如果你关心这里有一些额外的细节。
它在底层使用 CAS
(比较和交换),很像 AtomicLong
,但有一些不同。首先,它包含的实际 long value
被包裹在所谓的 Cell
中 - 基本上是 class 允许 cas
(比较和交换) value
到一个新值,如果你愿意的话,就像 setter 一样。这个 Cell
也用 @sun.misc.Contended
注释,以防止虚假共享;这是对它的解释(来自代码注释):
But Atomic objects residing in arrays will tend to be placed adjacent to each other, and so will most often share cache lines (with a huge negative performance impact) without this precaution.
这里的实现非常有趣。让我们看看调用 add(long x)
方法时会发生什么:
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
思路是,如果Cell [] cs
为null,之前没有争用,意思是long value
要么没有初始化,要么之前的所有CAS
所有线程的操作都已成功。在这种情况下,尝试将 CAS
的新值设为 long value
- 如果可行,我们就完成了。如果失败,则会创建一个 Cell []
数组,以便每个单独的线程尝试在它自己的 space 中工作,从而最大限度地减少争用。
下一句是你真正关心的,如果我正确理解了你的问题(这是我的,根本不是来自代码注释):
In simpler words: if there is no contention between threads, the work is done as if AtomicLong
is used (sort of), otherwise try to create a separate space for each thread to work on.
如果你关心一些我发现有趣的额外细节:
Cell[]
总是2的幂(很像HashMap
内部数组);然后每个线程使用 ThreadLocalRandom
创建一些 hashCode 来尝试在数组 Cell [] cs
中找到一个条目来写入,或者甚至使用 Marsaglia XorShif
再次重新散列以尝试找到一个空闲槽在这个数组中;数组的大小限制为您拥有的内核数量(实际中最接近的 2 的幂),该数组可以调整大小,因此它可以增长并且所有这些操作都是使用 volatile int cellsBusy
自旋锁完成的。这段代码很棒,但如前所述,我没有完全理解。
如果我在 Java 8 程序中有一个简单的 Integer,它可以被多个线程读写。
如果有人告诉我应用程序需要支持高吞吐量读取和极少的写入 - 答案很简单,我只使用读写锁。然后多个线程可以同时执行读取而不会阻塞 - 只有在不频繁的写入完成时才会发生阻塞。
但是如果我被告知应用程序需要支持高吞吐量写入(即共享变量被不同线程频繁更新)。无论我在这里使用哪种锁,据我所知,它总是会导致线程阻塞——因为当一个线程获得变量锁并更新它时,其余线程也会尝试更新变量将只需要等到他们获得锁定 - 这是正确的还是我在 Java 8 中遗漏了什么?
我可以在共享变量上编写某种异步更新方法,其中线程立即调用它 returns 的更新方法,我在幕后使用某种数据结构来将对共享变量的写入排队。至少这样我可以防止线程在尝试更新共享变量时阻塞。当然,这种方法会引发其他问题,比如线程是否应该假设它保证了写定义。成功还是我应该提供回调以通知更新成功等。除了类似的事情,我认为在 Java 8 中使用任何 Lock 进行高吞吐量写入时没有办法绕过阻塞? (或者即使在高吞吐量写入的情况下,我也应该只接受阻塞并只使用 Lock )。谢谢
严格来说 Integer
- 您可以使用 LongAdder
,它的实现似乎完全适合您的情况。如果你关心这里有一些额外的细节。
它在底层使用 CAS
(比较和交换),很像 AtomicLong
,但有一些不同。首先,它包含的实际 long value
被包裹在所谓的 Cell
中 - 基本上是 class 允许 cas
(比较和交换) value
到一个新值,如果你愿意的话,就像 setter 一样。这个 Cell
也用 @sun.misc.Contended
注释,以防止虚假共享;这是对它的解释(来自代码注释):
But Atomic objects residing in arrays will tend to be placed adjacent to each other, and so will most often share cache lines (with a huge negative performance impact) without this precaution.
这里的实现非常有趣。让我们看看调用 add(long x)
方法时会发生什么:
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
思路是,如果Cell [] cs
为null,之前没有争用,意思是long value
要么没有初始化,要么之前的所有CAS
所有线程的操作都已成功。在这种情况下,尝试将 CAS
的新值设为 long value
- 如果可行,我们就完成了。如果失败,则会创建一个 Cell []
数组,以便每个单独的线程尝试在它自己的 space 中工作,从而最大限度地减少争用。
下一句是你真正关心的,如果我正确理解了你的问题(这是我的,根本不是来自代码注释):
In simpler words: if there is no contention between threads, the work is done as if
AtomicLong
is used (sort of), otherwise try to create a separate space for each thread to work on.
如果你关心一些我发现有趣的额外细节:
Cell[]
总是2的幂(很像HashMap
内部数组);然后每个线程使用 ThreadLocalRandom
创建一些 hashCode 来尝试在数组 Cell [] cs
中找到一个条目来写入,或者甚至使用 Marsaglia XorShif
再次重新散列以尝试找到一个空闲槽在这个数组中;数组的大小限制为您拥有的内核数量(实际中最接近的 2 的幂),该数组可以调整大小,因此它可以增长并且所有这些操作都是使用 volatile int cellsBusy
自旋锁完成的。这段代码很棒,但如前所述,我没有完全理解。