同步异步后端
Synchronize asynchronous back end
我需要创建一个 REST 端点,它将 "synchronize" 请求和响应 to/from 通过 JMS 工作的后端服务。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时期限内没有响应,则将错误返回给消费者。对于消费者来说,这个端点应该看起来像一个普通的同步 request/response.
目前我已经使用 java.util.concurrent.Exchanger 实现了它。
我的代码(简化):
REST 端点:
@RestController
public class Endpoint {
private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();
@GetMapping("/data/{requestId}")
public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
// wait for JMS response and return it
return waitForResponse(syncExchanger, requestId, timeout);
}
private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
if (exchangers.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
Exchanger<String> exchanger = new Exchanger<>();
exchangers.put(requestId, exchanger);
return exchanger;
}
private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
try {
return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "interrupted";
} catch (TimeoutException e) {
throw new TimeoutException("timeout on waiting JMS response.", e);
} finally {
exchangers.remove(requestId);
}
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
Exchanger<String> exchanger = exchangers.get(requestId );
if (exchanger != null) {
try {
exchanger.exchange(payload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
exchangers.remove(requestId );
}
}
}
}
解决方案有效。但它 阻塞 在等待响应时请求线程。
然后网络服务器线程池在高负载时超出限制。
有没有办法以非阻塞的方式做到这一点?
像这样:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
sendToJMS(requestId);
// How to wait for JMS response with some timeout ?
});
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
// How to "complete" CompletableFuture ?
}
Spring 接受 CompletableFuture
作为控制器中的 return 类型,因此您可以在 createAndPutIfNotExists()
中创建一个并在 onMessage()
中完成它。
用 futures
地图替换您的 exchangers
地图:
private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
然后修改发送部分:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
CompletableFuture<String> future = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
result.thenRun(() -> futures.remove(requestId, future));
return result;
}
private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
if (futures.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
CompletableFuture<String> future = new CompletableFuture<>();
futures.put(requestId, future);
return future;
}
请注意,超时处理是使用 Java 9 的 orTimeout()
方法执行的。如果您使用 Java 8,则需要 custom timeout hanlding.
您可能还想使用一些 thenApplyAsync(s -> s, executor)
技巧将响应提交移出 JMS/超时处理线程。
最后,只是 complete()
收到响应时的未来:
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
CompletableFuture<String> future = futures.get(requestId);
if (future != null) {
try {
future.complete(payload);
} finally {
futures.remove(requestId, future);
}
}
}
我需要创建一个 REST 端点,它将 "synchronize" 请求和响应 to/from 通过 JMS 工作的后端服务。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时期限内没有响应,则将错误返回给消费者。对于消费者来说,这个端点应该看起来像一个普通的同步 request/response.
目前我已经使用 java.util.concurrent.Exchanger 实现了它。 我的代码(简化):
REST 端点:
@RestController
public class Endpoint {
private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();
@GetMapping("/data/{requestId}")
public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
// wait for JMS response and return it
return waitForResponse(syncExchanger, requestId, timeout);
}
private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
if (exchangers.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
Exchanger<String> exchanger = new Exchanger<>();
exchangers.put(requestId, exchanger);
return exchanger;
}
private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
try {
return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "interrupted";
} catch (TimeoutException e) {
throw new TimeoutException("timeout on waiting JMS response.", e);
} finally {
exchangers.remove(requestId);
}
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
Exchanger<String> exchanger = exchangers.get(requestId );
if (exchanger != null) {
try {
exchanger.exchange(payload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
exchangers.remove(requestId );
}
}
}
}
解决方案有效。但它 阻塞 在等待响应时请求线程。 然后网络服务器线程池在高负载时超出限制。
有没有办法以非阻塞的方式做到这一点?
像这样:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
sendToJMS(requestId);
// How to wait for JMS response with some timeout ?
});
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
// How to "complete" CompletableFuture ?
}
Spring 接受 CompletableFuture
作为控制器中的 return 类型,因此您可以在 createAndPutIfNotExists()
中创建一个并在 onMessage()
中完成它。
用 futures
地图替换您的 exchangers
地图:
private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
然后修改发送部分:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
CompletableFuture<String> future = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
result.thenRun(() -> futures.remove(requestId, future));
return result;
}
private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
if (futures.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
CompletableFuture<String> future = new CompletableFuture<>();
futures.put(requestId, future);
return future;
}
请注意,超时处理是使用 Java 9 的 orTimeout()
方法执行的。如果您使用 Java 8,则需要 custom timeout hanlding.
您可能还想使用一些 thenApplyAsync(s -> s, executor)
技巧将响应提交移出 JMS/超时处理线程。
最后,只是 complete()
收到响应时的未来:
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
CompletableFuture<String> future = futures.get(requestId);
if (future != null) {
try {
future.complete(payload);
} finally {
futures.remove(requestId, future);
}
}
}