同步异步后端

Synchronize asynchronous back end

我需要创建一个 REST 端点,它将 "synchronize" 请求和响应 to/from 通过 JMS 工作的后端服务。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时期限内没有响应,则将错误返回给消费者。对于消费者来说,这个端点应该看起来像一个普通的同步 request/response.

目前我已经使用 java.util.concurrent.Exchanger 实现了它。 我的代码(简化):

REST 端点:

@RestController
public class Endpoint {

   private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();

   @GetMapping("/data/{requestId}")
   public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
      Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
      sendToJMS(requestId);
      int timeout = 30;
      // wait for JMS response and return it
      return waitForResponse(syncExchanger, requestId, timeout);
   }

   private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
        if (exchangers.get(requestId) != null) {
            throw new BadHeaderException("Duplicate requestId");
        }
        Exchanger<String> exchanger = new Exchanger<>();
        exchangers.put(requestId, exchanger);
        return exchanger;
   }

   private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
        try {
            return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "interrupted";
        } catch (TimeoutException e) {
            throw new TimeoutException("timeout on waiting JMS response.", e);
        } finally {
            exchangers.remove(requestId);
        }
   }

   @JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();
      Exchanger<String> exchanger = exchangers.get(requestId );

      if (exchanger != null) {
            try {
                exchanger.exchange(payload);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                exchangers.remove(requestId );
            }
      }
   }
}

解决方案有效。但它 阻塞 在等待响应时请求线程。 然后网络服务器线程池在高负载时超出限制。

有没有办法以非阻塞的方式做到这一点?

像这样:

@GetMapping("/data/{requestId}")
   public CompletableFuture<String> getData() {
      return CompletableFuture.supplyAsync(() -> {
        sendToJMS(requestId);

        // How to wait for JMS response with some timeout ?

      });
   }

@JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();

      // How to "complete" CompletableFuture ?

   }

Spring 接受 CompletableFuture 作为控制器中的 return 类型,因此您可以在 createAndPutIfNotExists() 中创建一个并在 onMessage() 中完成它。

futures 地图替换您的 exchangers 地图:

private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();

然后修改发送部分:

@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
    CompletableFuture<String> future = createAndPutIfNotExists(requestId);
    sendToJMS(requestId);
    int timeout = 30;
    CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
    result.thenRun(() -> futures.remove(requestId, future));
    return result;
}

private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
    if (futures.get(requestId) != null) {
        throw new BadHeaderException("Duplicate requestId");
    }
    CompletableFuture<String> future = new CompletableFuture<>();
    futures.put(requestId, future);
    return future;
}

请注意,超时处理是使用 Java 9 的 orTimeout() 方法执行的。如果您使用 Java 8,则需要 custom timeout hanlding.

您可能还想使用一些 thenApplyAsync(s -> s, executor) 技巧将响应提交移出 JMS/超时处理线程。

最后,只是 complete() 收到响应时的未来:

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
    String requestId = m.getStringProperty("RequestId");
    String payload = m.getBody();
    CompletableFuture<String> future = futures.get(requestId);

    if (future != null) {
        try {
            future.complete(payload);
        } finally {
            futures.remove(requestId, future);
        }
    }
}