Vertx EventBus 回复 'Specific' 消息

Vertx EventBus Reply a 'Specific' Message

我们有一个案例如下:

问题在于,协调器从方法上下文发出消息并从另一个方法上下文获取响应:

private void forwardToVWClient(Message msg) {

        vertx.eventBus().send(RESTClient.ADDRESS, msg.body(), deliveryOptions, res -> {
            if (res.succeeded()) {
                log.info("forwardToVWClient. VW got result : success.");
                // do not reply ok until we get an OK from the Listener verticle

            } else {
                log.error("forwardToVWClient VW got result : failure.");
                msg.fail(500, res.cause().getMessage());
            }
        });
    }

然后我有另一个事件总线消费方法,我在其中接收响应:

vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);


private void handleResponseMessage(Message msg) {
        // how to reply the message received in the context of forwardToVWClient ?? 
}

那么,当我在 handleResponseMessage 中收到回复时,如何在 forwardToVWClient 的上下文中回复消息?

到目前为止的几个想法:

  1. 将消息放入顶点上下文中?
  2. 消息对象有一个字段:.replyAddress() returns 一个 int,我将其保存在静态 ConcurrentHashMap 中并使用它来回复特定消息。我会 post 更多细节作为答案。

有没有更好的方法?

实现它的一种方法是保存消息的 replyAddress 字段并使用它来将消息发回给发起者。

下面是一些简化的代码,展示了如何:

public class VehicleStateCoordinatorVerticle extends AbstractVerticle {


    final static String ADDRESS_REQUEST = "CoordinatorRequest";
    final static String ADDRESS_RESPONSE = "CoordinatorResponse";

    static ConcurrentHashMap<String, VWApiRequest> pendingCommands = new ConcurrentHashMap<>();


    public void start() {
        vertx.eventBus().consumer(ADDRESS_REQUEST, this::handleRequestMessage);
        vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
        log.info("===== VehicleStateCoordinatorVerticle - bus consumer ready =====");
    }

    private void handleRequestMessage(Message msg) {

            // .... omitted for brevity
            // save the replyAddress and the command for later/callback
            cmd.setReplyAddress(msg.replyAddress());
            pendingCommands.put(cmd.getVwReference(), cmd);


            forwardToVWClient(msg);
    }


    private void forwardToVWClient(Message msg) {

        vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, msg.body(), deliveryOptions, res -> {
            if (res.succeeded()) {
                log.info("forwardToVWClient. VW got result : success.");
                // do not reply ok until we get an OK from the VWAPIServer verticle

            } else {
                log.error("forwardToVWClient VW got result : failure.");
                msg.fail(500, res.cause().getMessage());
            }
        });
    }



    private void handleResponseMessage(Message msg) {

        //..
        VWApiRequest vwApiRequest = pendingCommands.get(vwReference);
        if(vwApiRequest == null){
            log.error("No pending vwApiRequest could be found!");
            return;
        }

        /**
         * Instead of targeting the RESTApi address,
         * we use the replyAddress to target the specific message that is pending response.
         */
        vertx.eventBus().send(vwApiRequest.getReplyAddress(), body, deliveryOptions, res -> {
            if (res.succeeded()) {
             // cheers!
            }
            else{
                log.error("Error in handleResponseMessage {}", res.cause().getMessage());
            }

        });
    }

可以吗? 或不。让我知道。

class AVerticle {

    public void start() throws Exception {
        router.route(path).method(HttpMethod.GET).handler(ctx -> replyHandler(ctx, replyPort));
    }

    private void replyHandler(RoutingContext ctx, String replyPort) {
        
        String uuid = UUID.randomUUID().toString();
        
        // send a message and waiting for reply (Msg1)
        Future<Message<Object>> future = vertx.eventBus().request(replyPort
                , Constaints.NO_REPLY
                , new DeliveryOptions()
                    .addHeader(Constaints.NO_REPLY, uuid));

        // then send the real data, markup with the same uuid (Msg2)
        vertx.eventBus().send(replyPort, "your data", new DeliveryOptions()
                    .addHeader(Constaints.UUID, uuid));

        // once received the result, process on.
        future.onSuccess(msg -> {
            HttpServerResponse response = ctx.response();
            String result = msg.body().toString();
            response.end(result);
        });
    }
    
}

class BVerticle {

    // uuid: Msg1_Obj 
    private Map<String, Message<?>> messageMap = new ConcurrentHashMap<String, Message<?>>();

    // you will get two message on replyPort
    public void start() throws Exception {
        msgConsumer = vertx.eventBus().localConsumer(inport, this::msgHandler);
    }

    private void msgHandler(Message<Object> message) {
        // store (Msg1) in memory
        if (Constaints.NO_REPLY.equals(message.body().toString())) {
            messageMap.put(message.headers().get(Constaints.NO_REPLY), message);
        }
        // do what you want with your data (Msg2)
        else {
            // your business code here
            String result = "your result";

            String uuid = json.getString(message.headers().get(Constaints.UUID));

            // get (Msg1) store in memory
            Message<?> theReplyMessage = messageMap.get(uuid);

            // use (Msg1) 's replyAddress to reply (Msg2) 's data
            theReplyMessage.reply(result);

            // clear the memory
            messageMap.remove(uuid);
        }
    }
    
}