在低锁争用的情况下用 atomic+while 循环替换 synchronized
Replace synchronized with atomic+while loop in case of low lock contention
我有两个函数必须 运行 在关键部分:
public synchronized void f1() { ... }
public synchronized void f2() { ... }
假设行为如下:
f1
几乎从未被调用过。实际上,在正常情况下,永远不会调用此方法。如果仍然调用 f1
,它应该 return 很快。
f2
被调用的频率非常高。它 return 非常快。
- 这些方法从不互相调用,也没有可重入性。
换句话说,竞争很低。因此,当 f2
被调用时,我们有一些开销来获取锁,在 99.9% 的情况下会立即授予锁。我想知道是否有方法可以避免这种开销。
我想出了以下替代方案:
private final AtomicInteger lock = new AtomicInteger(0);
public void f1() {
while (!lock.compareAndSet(0, 1)) {}
try {
...
} finally {
lock.set(0);
}
}
public void f2() {
while (!lock.compareAndSet(0, 2)) {}
try {
...
} finally {
lock.set(0);
}
}
还有其他方法吗? java.util.concurrent
包是否提供原生的东西?
更新
虽然我的目的是提出一个一般性问题,但关于我的情况的一些信息:
f1
:此方法创建一个新的远程流,如果由于某种原因当前流损坏,例如由于超时。远程流可以被视为从给定位置开始消耗远程队列的套接字连接:
private Stream stream;
public synchronized void f1() {
final Stream stream = new Stream(...);
if (this.stream != null) {
stream.setPosition(this.stream.getPosition());
}
this.stream = stream;
return stream;
}
f2
:此方法使流位置前进。这是一个普通的 setter:
public synchronized void f2(Long p) {
stream.setPosition(p);
}
在这里,stream.setPosition(Long)
也作为普通的 setter 实现:
public class Stream {
private volatile Long position = 0;
public void setPosition(Long position) {
this.position = position;
}
}
在Stream
中,当前位置会周期性异步发送到服务器。注意Stream
不是我自己实现的
我的想法是引入如上图所示的比较和交换,并将stream
标记为volatile
。
您的示例没有按照您的要求进行。当使用锁 时,您实际上是在执行代码。尝试这样的事情:
public void f1() {
while (!lock.compareAndSet(0, 1)) {
}
try {
...
} finally {
lock.set(0);
}
}
回答你的问题,我认为这不会比使用 synchronized
方法更快,而且这种方法更难阅读和理解。
另一种方法是使用像修改计数一样工作的时间戳锁。如果您的读写比率很高,这会很有效。
另一种方法是拥有一个不可变对象,该对象通过 AtomicReference 存储状态。如果您的读写比率非常高,这会很有效。
根据描述和您的示例代码,我推断如下:
Stream
有自己的内部位置,您也在外部跟踪最近的 position
。您将其用作一种 'resume point':当您需要重新初始化流时,将其推进到这一点。
最后一个已知的 position
可能已经过时;我假设这是基于你的断言,即流定期 异步 通知服务器其当前位置。
调用 f1
时,已知流处于错误状态。
函数f1
和f2
访问相同的数据,并且可能运行并发。但是,f1
和 f2
都不会 永远 运行 同时反对自己 。换句话说,你 几乎 有一个单线程程序,除了 f1
和 f2
都在执行的罕见情况。
[旁注: 我的解决方案实际上并不关心 f1
是否与自身同时被调用;它只关心 f2
没有与自己同时调用]
如果其中有任何错误,则以下解决方案是错误的。哎呀,无论如何它都可能是错误的,要么是因为遗漏了一些细节,要么是因为我犯了一个错误。编写低锁代码很难,这正是您应该避免使用它的原因,除非您观察到实际的性能问题。
static class Stream {
private long position = 0L;
void setPosition(long position) {
this.position = position;
}
}
final static class StreamInfo {
final Stream stream = new Stream();
volatile long resumePosition = -1;
final void setPosition(final long position) {
stream.setPosition(position);
resumePosition = position;
}
}
private final Object updateLock = new Object();
private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());
void f1() {
synchronized (updateLock) {
final StreamInfo oldInfo = currentInfo.getAndSet(null);
final StreamInfo newInfo = new StreamInfo();
if (oldInfo != null && oldInfo.resumePosition > 0L) {
newInfo.setPosition(oldInfo.resumePosition);
}
// Only `f2` can modify `currentInfo`, so update it last.
currentInfo.set(newInfo);
// The `f2` thread might be waiting for us, so wake them up.
updateLock.notifyAll();
}
}
void f2(final long newPosition) {
while (true) {
final StreamInfo s = acquireStream();
s.setPosition(newPosition);
s.resumePosition = newPosition;
// Make sure the stream wasn't replaced while we worked.
// If it was, run again with the new stream.
if (acquireStream() == s) {
break;
}
}
}
private StreamInfo acquireStream() {
// Optimistic concurrency: hope we get a stream that's ready to go.
// If we fail, branch off into a slower code path that waits for it.
final StreamInfo s = currentInfo.get();
return s != null ? s : acquireStreamSlow();
}
private StreamInfo acquireStreamSlow() {
synchronized (updateLock) {
while (true) {
final StreamInfo s = currentInfo.get();
if (s != null) {
return s;
}
try {
updateLock.wait();
}
catch (final InterruptedException ignored) {
}
}
}
}
如果流出现故障并被 f1
替换,可能是之前对 f2
的调用仍在对(现已失效的)流执行一些操作。我假设这没问题,并且它不会引入不良副作用(除了那些已经存在于基于锁的版本中的副作用)。我做出这个假设是因为我们已经在上面的列表中确定您的恢复点可能已过时,并且我们还确定 f1
仅在已知流处于不良状态时才被调用。
根据我的 JMH 基准测试,这种方法比 CAS 或同步版本(它们本身非常接近)快大约 3 倍。
我有两个函数必须 运行 在关键部分:
public synchronized void f1() { ... }
public synchronized void f2() { ... }
假设行为如下:
f1
几乎从未被调用过。实际上,在正常情况下,永远不会调用此方法。如果仍然调用f1
,它应该 return 很快。f2
被调用的频率非常高。它 return 非常快。- 这些方法从不互相调用,也没有可重入性。
换句话说,竞争很低。因此,当 f2
被调用时,我们有一些开销来获取锁,在 99.9% 的情况下会立即授予锁。我想知道是否有方法可以避免这种开销。
我想出了以下替代方案:
private final AtomicInteger lock = new AtomicInteger(0);
public void f1() {
while (!lock.compareAndSet(0, 1)) {}
try {
...
} finally {
lock.set(0);
}
}
public void f2() {
while (!lock.compareAndSet(0, 2)) {}
try {
...
} finally {
lock.set(0);
}
}
还有其他方法吗? java.util.concurrent
包是否提供原生的东西?
更新
虽然我的目的是提出一个一般性问题,但关于我的情况的一些信息:
f1
:此方法创建一个新的远程流,如果由于某种原因当前流损坏,例如由于超时。远程流可以被视为从给定位置开始消耗远程队列的套接字连接:
private Stream stream;
public synchronized void f1() {
final Stream stream = new Stream(...);
if (this.stream != null) {
stream.setPosition(this.stream.getPosition());
}
this.stream = stream;
return stream;
}
f2
:此方法使流位置前进。这是一个普通的 setter:
public synchronized void f2(Long p) {
stream.setPosition(p);
}
在这里,stream.setPosition(Long)
也作为普通的 setter 实现:
public class Stream {
private volatile Long position = 0;
public void setPosition(Long position) {
this.position = position;
}
}
在Stream
中,当前位置会周期性异步发送到服务器。注意Stream
不是我自己实现的
我的想法是引入如上图所示的比较和交换,并将stream
标记为volatile
。
您的示例没有按照您的要求进行。当使用锁 时,您实际上是在执行代码。尝试这样的事情:
public void f1() {
while (!lock.compareAndSet(0, 1)) {
}
try {
...
} finally {
lock.set(0);
}
}
回答你的问题,我认为这不会比使用 synchronized
方法更快,而且这种方法更难阅读和理解。
另一种方法是使用像修改计数一样工作的时间戳锁。如果您的读写比率很高,这会很有效。
另一种方法是拥有一个不可变对象,该对象通过 AtomicReference 存储状态。如果您的读写比率非常高,这会很有效。
根据描述和您的示例代码,我推断如下:
Stream
有自己的内部位置,您也在外部跟踪最近的position
。您将其用作一种 'resume point':当您需要重新初始化流时,将其推进到这一点。最后一个已知的
position
可能已经过时;我假设这是基于你的断言,即流定期 异步 通知服务器其当前位置。调用
f1
时,已知流处于错误状态。函数
f1
和f2
访问相同的数据,并且可能运行并发。但是,f1
和f2
都不会 永远 运行 同时反对自己 。换句话说,你 几乎 有一个单线程程序,除了f1
和f2
都在执行的罕见情况。[旁注: 我的解决方案实际上并不关心
f1
是否与自身同时被调用;它只关心f2
没有与自己同时调用]
如果其中有任何错误,则以下解决方案是错误的。哎呀,无论如何它都可能是错误的,要么是因为遗漏了一些细节,要么是因为我犯了一个错误。编写低锁代码很难,这正是您应该避免使用它的原因,除非您观察到实际的性能问题。
static class Stream {
private long position = 0L;
void setPosition(long position) {
this.position = position;
}
}
final static class StreamInfo {
final Stream stream = new Stream();
volatile long resumePosition = -1;
final void setPosition(final long position) {
stream.setPosition(position);
resumePosition = position;
}
}
private final Object updateLock = new Object();
private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());
void f1() {
synchronized (updateLock) {
final StreamInfo oldInfo = currentInfo.getAndSet(null);
final StreamInfo newInfo = new StreamInfo();
if (oldInfo != null && oldInfo.resumePosition > 0L) {
newInfo.setPosition(oldInfo.resumePosition);
}
// Only `f2` can modify `currentInfo`, so update it last.
currentInfo.set(newInfo);
// The `f2` thread might be waiting for us, so wake them up.
updateLock.notifyAll();
}
}
void f2(final long newPosition) {
while (true) {
final StreamInfo s = acquireStream();
s.setPosition(newPosition);
s.resumePosition = newPosition;
// Make sure the stream wasn't replaced while we worked.
// If it was, run again with the new stream.
if (acquireStream() == s) {
break;
}
}
}
private StreamInfo acquireStream() {
// Optimistic concurrency: hope we get a stream that's ready to go.
// If we fail, branch off into a slower code path that waits for it.
final StreamInfo s = currentInfo.get();
return s != null ? s : acquireStreamSlow();
}
private StreamInfo acquireStreamSlow() {
synchronized (updateLock) {
while (true) {
final StreamInfo s = currentInfo.get();
if (s != null) {
return s;
}
try {
updateLock.wait();
}
catch (final InterruptedException ignored) {
}
}
}
}
如果流出现故障并被 f1
替换,可能是之前对 f2
的调用仍在对(现已失效的)流执行一些操作。我假设这没问题,并且它不会引入不良副作用(除了那些已经存在于基于锁的版本中的副作用)。我做出这个假设是因为我们已经在上面的列表中确定您的恢复点可能已过时,并且我们还确定 f1
仅在已知流处于不良状态时才被调用。
根据我的 JMH 基准测试,这种方法比 CAS 或同步版本(它们本身非常接近)快大约 3 倍。