Vertx EventBus 回复 'Specific' 消息
Vertx EventBus Reply a 'Specific' Message
我们有一个案例如下:
问题在于,协调器从方法上下文发出消息并从另一个方法上下文获取响应:
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(RESTClient.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the Listener verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
然后我有另一个事件总线消费方法,我在其中接收响应:
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
private void handleResponseMessage(Message msg) {
// how to reply the message received in the context of forwardToVWClient ??
}
那么,当我在 handleResponseMessage
中收到回复时,如何在 forwardToVWClient
的上下文中回复消息?
到目前为止的几个想法:
- 将消息放入顶点上下文中?
- 消息对象有一个字段:
.replyAddress()
returns 一个 int,我将其保存在静态 ConcurrentHashMap 中并使用它来回复特定消息。我会 post 更多细节作为答案。
有没有更好的方法?
实现它的一种方法是保存消息的 replyAddress
字段并使用它来将消息发回给发起者。
下面是一些简化的代码,展示了如何:
public class VehicleStateCoordinatorVerticle extends AbstractVerticle {
final static String ADDRESS_REQUEST = "CoordinatorRequest";
final static String ADDRESS_RESPONSE = "CoordinatorResponse";
static ConcurrentHashMap<String, VWApiRequest> pendingCommands = new ConcurrentHashMap<>();
public void start() {
vertx.eventBus().consumer(ADDRESS_REQUEST, this::handleRequestMessage);
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
log.info("===== VehicleStateCoordinatorVerticle - bus consumer ready =====");
}
private void handleRequestMessage(Message msg) {
// .... omitted for brevity
// save the replyAddress and the command for later/callback
cmd.setReplyAddress(msg.replyAddress());
pendingCommands.put(cmd.getVwReference(), cmd);
forwardToVWClient(msg);
}
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the VWAPIServer verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
private void handleResponseMessage(Message msg) {
//..
VWApiRequest vwApiRequest = pendingCommands.get(vwReference);
if(vwApiRequest == null){
log.error("No pending vwApiRequest could be found!");
return;
}
/**
* Instead of targeting the RESTApi address,
* we use the replyAddress to target the specific message that is pending response.
*/
vertx.eventBus().send(vwApiRequest.getReplyAddress(), body, deliveryOptions, res -> {
if (res.succeeded()) {
// cheers!
}
else{
log.error("Error in handleResponseMessage {}", res.cause().getMessage());
}
});
}
可以吗?
或不。让我知道。
class AVerticle {
public void start() throws Exception {
router.route(path).method(HttpMethod.GET).handler(ctx -> replyHandler(ctx, replyPort));
}
private void replyHandler(RoutingContext ctx, String replyPort) {
String uuid = UUID.randomUUID().toString();
// send a message and waiting for reply (Msg1)
Future<Message<Object>> future = vertx.eventBus().request(replyPort
, Constaints.NO_REPLY
, new DeliveryOptions()
.addHeader(Constaints.NO_REPLY, uuid));
// then send the real data, markup with the same uuid (Msg2)
vertx.eventBus().send(replyPort, "your data", new DeliveryOptions()
.addHeader(Constaints.UUID, uuid));
// once received the result, process on.
future.onSuccess(msg -> {
HttpServerResponse response = ctx.response();
String result = msg.body().toString();
response.end(result);
});
}
}
class BVerticle {
// uuid: Msg1_Obj
private Map<String, Message<?>> messageMap = new ConcurrentHashMap<String, Message<?>>();
// you will get two message on replyPort
public void start() throws Exception {
msgConsumer = vertx.eventBus().localConsumer(inport, this::msgHandler);
}
private void msgHandler(Message<Object> message) {
// store (Msg1) in memory
if (Constaints.NO_REPLY.equals(message.body().toString())) {
messageMap.put(message.headers().get(Constaints.NO_REPLY), message);
}
// do what you want with your data (Msg2)
else {
// your business code here
String result = "your result";
String uuid = json.getString(message.headers().get(Constaints.UUID));
// get (Msg1) store in memory
Message<?> theReplyMessage = messageMap.get(uuid);
// use (Msg1) 's replyAddress to reply (Msg2) 's data
theReplyMessage.reply(result);
// clear the memory
messageMap.remove(uuid);
}
}
}
我们有一个案例如下:
问题在于,协调器从方法上下文发出消息并从另一个方法上下文获取响应:
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(RESTClient.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the Listener verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
然后我有另一个事件总线消费方法,我在其中接收响应:
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
private void handleResponseMessage(Message msg) {
// how to reply the message received in the context of forwardToVWClient ??
}
那么,当我在 handleResponseMessage
中收到回复时,如何在 forwardToVWClient
的上下文中回复消息?
到目前为止的几个想法:
- 将消息放入顶点上下文中?
- 消息对象有一个字段:
.replyAddress()
returns 一个 int,我将其保存在静态 ConcurrentHashMap 中并使用它来回复特定消息。我会 post 更多细节作为答案。
有没有更好的方法?
实现它的一种方法是保存消息的 replyAddress
字段并使用它来将消息发回给发起者。
下面是一些简化的代码,展示了如何:
public class VehicleStateCoordinatorVerticle extends AbstractVerticle {
final static String ADDRESS_REQUEST = "CoordinatorRequest";
final static String ADDRESS_RESPONSE = "CoordinatorResponse";
static ConcurrentHashMap<String, VWApiRequest> pendingCommands = new ConcurrentHashMap<>();
public void start() {
vertx.eventBus().consumer(ADDRESS_REQUEST, this::handleRequestMessage);
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
log.info("===== VehicleStateCoordinatorVerticle - bus consumer ready =====");
}
private void handleRequestMessage(Message msg) {
// .... omitted for brevity
// save the replyAddress and the command for later/callback
cmd.setReplyAddress(msg.replyAddress());
pendingCommands.put(cmd.getVwReference(), cmd);
forwardToVWClient(msg);
}
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the VWAPIServer verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
private void handleResponseMessage(Message msg) {
//..
VWApiRequest vwApiRequest = pendingCommands.get(vwReference);
if(vwApiRequest == null){
log.error("No pending vwApiRequest could be found!");
return;
}
/**
* Instead of targeting the RESTApi address,
* we use the replyAddress to target the specific message that is pending response.
*/
vertx.eventBus().send(vwApiRequest.getReplyAddress(), body, deliveryOptions, res -> {
if (res.succeeded()) {
// cheers!
}
else{
log.error("Error in handleResponseMessage {}", res.cause().getMessage());
}
});
}
可以吗? 或不。让我知道。
class AVerticle {
public void start() throws Exception {
router.route(path).method(HttpMethod.GET).handler(ctx -> replyHandler(ctx, replyPort));
}
private void replyHandler(RoutingContext ctx, String replyPort) {
String uuid = UUID.randomUUID().toString();
// send a message and waiting for reply (Msg1)
Future<Message<Object>> future = vertx.eventBus().request(replyPort
, Constaints.NO_REPLY
, new DeliveryOptions()
.addHeader(Constaints.NO_REPLY, uuid));
// then send the real data, markup with the same uuid (Msg2)
vertx.eventBus().send(replyPort, "your data", new DeliveryOptions()
.addHeader(Constaints.UUID, uuid));
// once received the result, process on.
future.onSuccess(msg -> {
HttpServerResponse response = ctx.response();
String result = msg.body().toString();
response.end(result);
});
}
}
class BVerticle {
// uuid: Msg1_Obj
private Map<String, Message<?>> messageMap = new ConcurrentHashMap<String, Message<?>>();
// you will get two message on replyPort
public void start() throws Exception {
msgConsumer = vertx.eventBus().localConsumer(inport, this::msgHandler);
}
private void msgHandler(Message<Object> message) {
// store (Msg1) in memory
if (Constaints.NO_REPLY.equals(message.body().toString())) {
messageMap.put(message.headers().get(Constaints.NO_REPLY), message);
}
// do what you want with your data (Msg2)
else {
// your business code here
String result = "your result";
String uuid = json.getString(message.headers().get(Constaints.UUID));
// get (Msg1) store in memory
Message<?> theReplyMessage = messageMap.get(uuid);
// use (Msg1) 's replyAddress to reply (Msg2) 's data
theReplyMessage.reply(result);
// clear the memory
messageMap.remove(uuid);
}
}
}