同步异步通信
Synchronize asynchronous communication
我有一个 REST 服务,它接收一些数据并通过异步 IBM MQ 请求检查数据。
REST 控制器:
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
String data = request.getData();
jmsSender.send(data);
// Now I need the response from MQ
// String mqResponse = ...
if (mqIsValid(mqResponse)) {
return createValidResponse();
}
return createNotValidResponse();
}
}
MQ 发件人:
@Service
public class JmsSender {
public void send(String data) {
jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
}
}
MQ 接收方:
@Component
public class JmsReceiver {
@JmsListener(destination = "QUEUE.FROM.MQ, containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message) {
// How to pass the message to the controller?
}
}
如何等待来自 MQ 的正确数据在控制器中创建正确的响应?
是否可以像 here 中描述的那样使用 BlockingQueue
?就我而言,我必须区分数据。我不能只从阻塞队列中取出第一个数据。
如果同时有两个 REST 请求(数据:abc
和 xyz
)。我如何才能确保回答正确答案而不仅仅是我从 MQ 获得的第一个答案?
我也改不了MQ接口
尝试使用如下所示的 CountDownLatch。
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
final CountDownLatch jmsLatch = new CountDownLatch (1);
String data = request.getData();
jmsSender.send(data, jmsLatch);
try {
latch.await(); // wait untill latch counted down to 0
} catch (InterruptedException e) {
return createNotValidResponse();
}
return createValidResponse();
}
}
修改发送方法以从控制器获取CountDownLatch。
@Service
public class JmsSender {
public void send(String data, final CountDownLatch jmsLatch) {
jmsLatch.await();
jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
}
}
修改接收方法以从控制器获取相同的CountDownLatch。
@Component
public class JmsReceiver {
@JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message, final CountDownLatch jmsLatch) {
// Pass the message to the controller
jmsLatch.countDown();
}
}
这里的技巧是您必须将相同的 CountDownLatch 实例从控制器传播到发送者和接收者 class 并在收到消息后调用 countDown 方法。
由于找不到适合我的解决方案,我创建了一个简单的等待机制来获取数据。
MqReceiver:
@Component
public class JmsReceiver {
private final Lock lock;
private final Condition containsKey;
private final Map<String, String> responses;
public JmsReceiver() {
this.lock = new ReentrantLock();
this.containsKey = lock.newCondition();
this.responses = new HashMap<>();
}
@JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message) {
put(getKeyFromMessage(message), message);
}
public String get(String key) throws InterruptedException {
lock.lock();
try {
while (!responses.containsKey(key)) {
containsKey.await();
}
return responses.get(key);
} finally {
lock.unlock();
}
}
public void put(String key, String messagee) {
lock.lock();
try {
responses.put(key, messagee);
containsKey.signalAll();
} finally {
lock.unlock();
}
}
}
这个可以在控制器中使用:
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
String data = request.getData();
jmsSender.send(data);
String key = getKeyFromData(data);
// waits until MQ sends the data
String mqResponse = jmsReceiver.get(key);
if (mqIsValid(mqResponse)) {
return createValidResponse();
}
return createNotValidResponse();
}
}
场景 sync-async 的解决方案,使用 jms(activemq)
实现 request-reply 模式
这个例子的想法是在不同的 jvm 中的两个不同的服务中工作。该解决方案与多个实例服务并发测试:
服务 1 (M1) - 休息 api 同步并在某个时候启动一个
异步流使用 activemq 调用第二个服务 M2 实现集成模式 Request-Reply。您不需要停止或等待任何线程,jms 模式实现了 ack Session.AUTO_ACKNOWLEDGE。
@PostMapping
public AnyDto sendMessage(final AnyDto anyDto) {
return routeService.send(anyDto);
}
public void flowOrchestation (final anyDto data) throws JMSException {
final ObjectMessage objectMessage = composeTemplateMessage(data);
final AnyDto responseDto = jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue("queue.request"),
objectMessage, AnyDto.class);
}
private ObjectMessage composeTemplateMessage(final AnyDto data) throws JMSException {
jmsTemplate.setReceiveTimeout(10000L);
jmsMessagingTemplate.setJmsTemplate(jmsTemplate);
Session session = jmsMessagingTemplate.getConnectionFactory().createConnection()
.createSession(false, Session.AUTO_ACKNOWLEDGE);
final ObjectMessage objectMessage = session.createObjectMessage(data);
objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
objectMessage.setJMSReplyTo(new ActiveMQQueue("queue.response"));
objectMessage.setJMSExpiration(0);
objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
return objectMessage;
}
Timeout and expiration can be modified depending your requeriments. 0 expiration means no time to expire.
- Service 2 (M2): 只接收消息并响应M1上设置的JmsReplyTo。
@Component
public class Consumer implements SessionAwareMessageListener<Message> {
@Override
@JmsListener(destination = "${queue.request}")
public void onMessage(Message message, Session session) throws JMSException {
AnyDto anyDto = (AnyDto) ((ActiveMQObjectMessage) message).getObject();
//do some stuff
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(dtoModified);
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}}
我有一个 REST 服务,它接收一些数据并通过异步 IBM MQ 请求检查数据。
REST 控制器:
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
String data = request.getData();
jmsSender.send(data);
// Now I need the response from MQ
// String mqResponse = ...
if (mqIsValid(mqResponse)) {
return createValidResponse();
}
return createNotValidResponse();
}
}
MQ 发件人:
@Service
public class JmsSender {
public void send(String data) {
jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
}
}
MQ 接收方:
@Component
public class JmsReceiver {
@JmsListener(destination = "QUEUE.FROM.MQ, containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message) {
// How to pass the message to the controller?
}
}
如何等待来自 MQ 的正确数据在控制器中创建正确的响应?
是否可以像 here 中描述的那样使用 BlockingQueue
?就我而言,我必须区分数据。我不能只从阻塞队列中取出第一个数据。
如果同时有两个 REST 请求(数据:abc
和 xyz
)。我如何才能确保回答正确答案而不仅仅是我从 MQ 获得的第一个答案?
我也改不了MQ接口
尝试使用如下所示的 CountDownLatch。
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
final CountDownLatch jmsLatch = new CountDownLatch (1);
String data = request.getData();
jmsSender.send(data, jmsLatch);
try {
latch.await(); // wait untill latch counted down to 0
} catch (InterruptedException e) {
return createNotValidResponse();
}
return createValidResponse();
}
}
修改发送方法以从控制器获取CountDownLatch。
@Service
public class JmsSender {
public void send(String data, final CountDownLatch jmsLatch) {
jmsLatch.await();
jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
}
}
修改接收方法以从控制器获取相同的CountDownLatch。
@Component
public class JmsReceiver {
@JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message, final CountDownLatch jmsLatch) {
// Pass the message to the controller
jmsLatch.countDown();
}
}
这里的技巧是您必须将相同的 CountDownLatch 实例从控制器传播到发送者和接收者 class 并在收到消息后调用 countDown 方法。
由于找不到适合我的解决方案,我创建了一个简单的等待机制来获取数据。
MqReceiver:
@Component
public class JmsReceiver {
private final Lock lock;
private final Condition containsKey;
private final Map<String, String> responses;
public JmsReceiver() {
this.lock = new ReentrantLock();
this.containsKey = lock.newCondition();
this.responses = new HashMap<>();
}
@JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
public void receiveMessage(String message) {
put(getKeyFromMessage(message), message);
}
public String get(String key) throws InterruptedException {
lock.lock();
try {
while (!responses.containsKey(key)) {
containsKey.await();
}
return responses.get(key);
} finally {
lock.unlock();
}
}
public void put(String key, String messagee) {
lock.lock();
try {
responses.put(key, messagee);
containsKey.signalAll();
} finally {
lock.unlock();
}
}
}
这个可以在控制器中使用:
@RestController
@RequestMapping("/request")
public class RequestController {
@RequestMapping(method = RequestMethod.POST)
public Response postRequest(@RequestBody Request request) {
String data = request.getData();
jmsSender.send(data);
String key = getKeyFromData(data);
// waits until MQ sends the data
String mqResponse = jmsReceiver.get(key);
if (mqIsValid(mqResponse)) {
return createValidResponse();
}
return createNotValidResponse();
}
}
场景 sync-async 的解决方案,使用 jms(activemq)
实现 request-reply 模式这个例子的想法是在不同的 jvm 中的两个不同的服务中工作。该解决方案与多个实例服务并发测试:
服务 1 (M1) - 休息 api 同步并在某个时候启动一个 异步流使用 activemq 调用第二个服务 M2 实现集成模式 Request-Reply。您不需要停止或等待任何线程,jms 模式实现了 ack Session.AUTO_ACKNOWLEDGE。
@PostMapping public AnyDto sendMessage(final AnyDto anyDto) { return routeService.send(anyDto); } public void flowOrchestation (final anyDto data) throws JMSException { final ObjectMessage objectMessage = composeTemplateMessage(data); final AnyDto responseDto = jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue("queue.request"), objectMessage, AnyDto.class); } private ObjectMessage composeTemplateMessage(final AnyDto data) throws JMSException { jmsTemplate.setReceiveTimeout(10000L); jmsMessagingTemplate.setJmsTemplate(jmsTemplate); Session session = jmsMessagingTemplate.getConnectionFactory().createConnection() .createSession(false, Session.AUTO_ACKNOWLEDGE); final ObjectMessage objectMessage = session.createObjectMessage(data); objectMessage.setJMSCorrelationID(UUID.randomUUID().toString()); objectMessage.setJMSReplyTo(new ActiveMQQueue("queue.response")); objectMessage.setJMSExpiration(0); objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); return objectMessage; }
Timeout and expiration can be modified depending your requeriments. 0 expiration means no time to expire.
- Service 2 (M2): 只接收消息并响应M1上设置的JmsReplyTo。
@Component
public class Consumer implements SessionAwareMessageListener<Message> {
@Override
@JmsListener(destination = "${queue.request}")
public void onMessage(Message message, Session session) throws JMSException {
AnyDto anyDto = (AnyDto) ((ActiveMQObjectMessage) message).getObject();
//do some stuff
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(dtoModified);
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}}