同步并发请求以共享缓慢操作的结果
Synchronise concurrent requests to share results of a slow operation
我有一个 Java UI 服务,它有一个 API 方法调用一个相对较慢的操作(比如 ~30 秒)。该操作是无参数的,但它对随时间变化(相对缓慢)的外部数据进行操作。 return 最新结果的方法并不重要 - 如果它们有 30 秒的历史,那是可以接受的。
最终我需要优化缓慢操作的实现,但作为短期修复,我想让操作互斥,这样如果第二个传入请求(在单独的线程上)尝试在另一个已经在进行时调用该操作,则第二个将阻塞直到第一个完成。第二个线程然后使用第一次调用该操作的结果 - 即它不会再次尝试 运行 该操作。
例如:
class MyService {
String serviceApiMmethod() {
// If a second thread attempts to call this method while another is in progress
// then block here until the first returns and then use those results
// (allowing it to return immediately without a second call to callSlowOperation).
return callSlowOperation();
}
}
Java (8) 中首选的通用方法是什么。我猜我可以使用 CountDownLatch,但不清楚如何最好地跨线程共享结果。是否存在促进此操作的现有并发原语?
编辑: 一旦所有线程都使用了它(即 returned 到调用者),我需要清除对结果的任何引用,因为它相对较大对象,需要尽快进行 GC。
作为解决方案,您可以使用如下方法:
public class MyService {
private volatile ResultHolder holder;
public String serviceApiMethod() {
if (holder != null && !isTimedOut(holder.calculated)) {
return holder.result;
}
synchronized (this) {
if (holder != null && !isTimedOut(holder.calculated)) {
return holder.result;
}
String result = callSlowOperation();
holder = new ResultHolder(result, LocalDateTime.now());
return result;
}
}
private static class ResultHolder {
private final String result;
private final LocalDateTime calculated;
public ResultHolder(String result, LocalDateTime calculated) {
this.result = result;
this.calculated = calculated;
}
}
}
注意 MyService 必须是单例的,ResultHolder 必须是不可变的
另一种方法(我认为可能更好)是将所有请求结果的线程添加到一个同步集合中。然后当结果到达时 - 将响应发送回线程。可以使用java8个功能接口消费者,让它更花哨。它不会浪费 CPU 时间(比如 thread.sleep 甚至 countDownLatch 和其他现代 java 并发 类)。它需要这些线程有一个回调方法来接受结果,但它甚至可能使您的代码更易于阅读:
class MyService {
private static volatile boolean isProcessing;
private synchronized static boolean isProcessing() {
return isProcessing;
}
private static Set<Consumer<String>> callers=Collections.synchronizedSet(new HashSet<>());
void serviceApiMmethod(Consumer<String> callBack) {
callers.add(callBack);
callSlowOperation();
}
private synchronized static void callSlowOperation() {
if(isProcessing())
return;
isProcessing=true;
try { Thread.sleep(1000); }catch (Exception e) {}//Simulate slow operation
final String result="slow result";
callers.forEach(consumer-> consumer.accept(result));
callers.clear();
isProcessing=false;
}
}
调用线程:
class ConsumerThread implements Runnable{
final int threadNumber;
public ConsumerThread(int num) {
this.threadNumber=num;
}
public void processResponse(String response) {
System.out.println("Thread ["+threadNumber+"] got response:"+response+" at:"+System.currentTimeMillis());
}
@Override
public void run() {
new MyService().serviceApiMmethod(this::processResponse);
}
}
这样生成的对象将被垃圾回收,因为所有消费者都会立即获取并释放它。
并测试结果:
public class Test{
public static void main(String[] args) {
System.out.println(System.currentTimeMillis());
for(int i=0;i<5;i++) {
new Thread(new ConsumerThread(i)).start();
}
}
}
结果:
1542201686784
Thread [1] got response:slow result at:1542201687827
Thread [2] got response:slow result at:1542201687827
Thread [3] got response:slow result at:1542201687827
Thread [0] got response:slow result at:1542201687827
Thread [4] got response:slow result at:1542201687827
所有线程在 1 秒后得到结果。一种反应式编程;)它确实将方式改变为更加异步的方式,但是如果线程的调用者想要在获取结果时阻止执行,他可以实现它。该服务基本上是关于所做的事情的自我解释。这就像说 "my operation is slow so instead of running the call for each of you callers, I will send you the result once I am ready - give me a consumer method"
ReentrantReadWriteLock
会更容易使用:
class MyService {
String result;
ReadWriteLock lock = new ReentrantReadWriteLock();
String serviceApiMmethod() {
lock.readLock().lock();
try {
if (result == null || staleResult()) {
lock.readLock().unlock();
lock.writeLock().lock();
try {
if (result == null || staleResult()) {
result = callSlowOperation();
}
} finally {
lock.writeLock().unlock();
lock.readLock().lock();
}
}
return result;
} finally {
lock.readLock().unlock();
}
}
}
这里,读锁防止读取陈旧状态,写锁防止多个威胁同时执行"slow operation"。
简单的想法
版本 1:
class Foo {
public String foo() throws Exception {
synchronized (this) {
if (counter.incrementAndGet() == 1) {
future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
} catch (InterruptedException e) {
}
return "ok" + ThreadLocalRandom.current().nextInt();
});
}
}
String result = future.get();
if (counter.decrementAndGet() == 0) {
future = null;
}
return result;
}
private AtomicInteger counter = new AtomicInteger();
private Future<String> future;
}
版本 2:与@AleksandrSemyannikov 一起
public class MyService {
private AtomicInteger counter = new AtomicInteger();
private volatile String result;
public String serviceApiMethod() {
counter.incrementAndGet();
try {
synchronized (this) {
if (result == null) {
result = callSlowOperation();
}
}
return result;
} finally {
if (counter.decrementAndGet() == 0) {
synchronized (this) {
if (counter.get() == 0) {
result = null;
}
}
}
}
}
private String callSlowOperation() {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName();
}
}
我有一个 Java UI 服务,它有一个 API 方法调用一个相对较慢的操作(比如 ~30 秒)。该操作是无参数的,但它对随时间变化(相对缓慢)的外部数据进行操作。 return 最新结果的方法并不重要 - 如果它们有 30 秒的历史,那是可以接受的。
最终我需要优化缓慢操作的实现,但作为短期修复,我想让操作互斥,这样如果第二个传入请求(在单独的线程上)尝试在另一个已经在进行时调用该操作,则第二个将阻塞直到第一个完成。第二个线程然后使用第一次调用该操作的结果 - 即它不会再次尝试 运行 该操作。
例如:
class MyService {
String serviceApiMmethod() {
// If a second thread attempts to call this method while another is in progress
// then block here until the first returns and then use those results
// (allowing it to return immediately without a second call to callSlowOperation).
return callSlowOperation();
}
}
Java (8) 中首选的通用方法是什么。我猜我可以使用 CountDownLatch,但不清楚如何最好地跨线程共享结果。是否存在促进此操作的现有并发原语?
编辑: 一旦所有线程都使用了它(即 returned 到调用者),我需要清除对结果的任何引用,因为它相对较大对象,需要尽快进行 GC。
作为解决方案,您可以使用如下方法:
public class MyService {
private volatile ResultHolder holder;
public String serviceApiMethod() {
if (holder != null && !isTimedOut(holder.calculated)) {
return holder.result;
}
synchronized (this) {
if (holder != null && !isTimedOut(holder.calculated)) {
return holder.result;
}
String result = callSlowOperation();
holder = new ResultHolder(result, LocalDateTime.now());
return result;
}
}
private static class ResultHolder {
private final String result;
private final LocalDateTime calculated;
public ResultHolder(String result, LocalDateTime calculated) {
this.result = result;
this.calculated = calculated;
}
}
}
注意 MyService 必须是单例的,ResultHolder 必须是不可变的
另一种方法(我认为可能更好)是将所有请求结果的线程添加到一个同步集合中。然后当结果到达时 - 将响应发送回线程。可以使用java8个功能接口消费者,让它更花哨。它不会浪费 CPU 时间(比如 thread.sleep 甚至 countDownLatch 和其他现代 java 并发 类)。它需要这些线程有一个回调方法来接受结果,但它甚至可能使您的代码更易于阅读:
class MyService {
private static volatile boolean isProcessing;
private synchronized static boolean isProcessing() {
return isProcessing;
}
private static Set<Consumer<String>> callers=Collections.synchronizedSet(new HashSet<>());
void serviceApiMmethod(Consumer<String> callBack) {
callers.add(callBack);
callSlowOperation();
}
private synchronized static void callSlowOperation() {
if(isProcessing())
return;
isProcessing=true;
try { Thread.sleep(1000); }catch (Exception e) {}//Simulate slow operation
final String result="slow result";
callers.forEach(consumer-> consumer.accept(result));
callers.clear();
isProcessing=false;
}
}
调用线程:
class ConsumerThread implements Runnable{
final int threadNumber;
public ConsumerThread(int num) {
this.threadNumber=num;
}
public void processResponse(String response) {
System.out.println("Thread ["+threadNumber+"] got response:"+response+" at:"+System.currentTimeMillis());
}
@Override
public void run() {
new MyService().serviceApiMmethod(this::processResponse);
}
}
这样生成的对象将被垃圾回收,因为所有消费者都会立即获取并释放它。
并测试结果:
public class Test{
public static void main(String[] args) {
System.out.println(System.currentTimeMillis());
for(int i=0;i<5;i++) {
new Thread(new ConsumerThread(i)).start();
}
}
}
结果:
1542201686784
Thread [1] got response:slow result at:1542201687827
Thread [2] got response:slow result at:1542201687827
Thread [3] got response:slow result at:1542201687827
Thread [0] got response:slow result at:1542201687827
Thread [4] got response:slow result at:1542201687827
所有线程在 1 秒后得到结果。一种反应式编程;)它确实将方式改变为更加异步的方式,但是如果线程的调用者想要在获取结果时阻止执行,他可以实现它。该服务基本上是关于所做的事情的自我解释。这就像说 "my operation is slow so instead of running the call for each of you callers, I will send you the result once I am ready - give me a consumer method"
ReentrantReadWriteLock
会更容易使用:
class MyService {
String result;
ReadWriteLock lock = new ReentrantReadWriteLock();
String serviceApiMmethod() {
lock.readLock().lock();
try {
if (result == null || staleResult()) {
lock.readLock().unlock();
lock.writeLock().lock();
try {
if (result == null || staleResult()) {
result = callSlowOperation();
}
} finally {
lock.writeLock().unlock();
lock.readLock().lock();
}
}
return result;
} finally {
lock.readLock().unlock();
}
}
}
这里,读锁防止读取陈旧状态,写锁防止多个威胁同时执行"slow operation"。
简单的想法
版本 1:
class Foo {
public String foo() throws Exception {
synchronized (this) {
if (counter.incrementAndGet() == 1) {
future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
} catch (InterruptedException e) {
}
return "ok" + ThreadLocalRandom.current().nextInt();
});
}
}
String result = future.get();
if (counter.decrementAndGet() == 0) {
future = null;
}
return result;
}
private AtomicInteger counter = new AtomicInteger();
private Future<String> future;
}
版本 2:与@AleksandrSemyannikov 一起
public class MyService {
private AtomicInteger counter = new AtomicInteger();
private volatile String result;
public String serviceApiMethod() {
counter.incrementAndGet();
try {
synchronized (this) {
if (result == null) {
result = callSlowOperation();
}
}
return result;
} finally {
if (counter.decrementAndGet() == 0) {
synchronized (this) {
if (counter.get() == 0) {
result = null;
}
}
}
}
}
private String callSlowOperation() {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName();
}
}