正在创建然后关闭的 Vertx JS Eventbus 连接

Vertx JS Eventbus connection being created then closing

背景: 在 [=73= 上使用 Vertx 3.3.3 CoreWeb ] 端作为服务器,使用 vertx-web-3.3.3-client.js 作为客户端 sockjsclient1.1.2.js

问题: 我在本地计算机或 LAN 上成功地从客户端连接到事件总线。当我通过代理时,wss 事件总线连接被阻止(在 Firefox 中我看到“Firefox 无法建立到 "wss://..." 的连接;在 Chromium 中我看到“WebSocket连接到 wss://... 失败:WebSocket 握手期间出错:意外响应代码:400”,然后我看到“https://.../eventbus/... /xhr_send?t=... 加载资源失败:服务器响应状态代码 500")。但是,onopen 触发并我收到了一些数据(连接降级为可接受的协议?)。紧接着,onclose 触发并且我失去了连接。我知道我已成功到达 Java 顶点服务器,因为我的静态网站和 API 调用正常。

问题: 我已经广泛阅读了 Vertx 和 SockJS 文档。有没有:

  1. 有关 vertx 如何在 JS Eventbus 连接中尝试不同传输协议的文档?
  2. JS Vertx Eventbus 通过业务代理工作的例子?
  3. 实现 Eventbus 连接的另一种方法,也许将 SockJS 协议指定为 try/use? (我正在尝试创建事件总线连接的最简单方法,如文档中许多地方所示)
  4. 我需要在 SockJS/Eventbus 设置的 Java 端做些什么?

提前感谢任何advice/help!

编辑 1: 为 Java 服务器和 Java 脚本 Web 客户端添加以下代码。 Web 端非常基础(以及失败的地方)。 Java 端正在使用 Spring 进行依赖注入和应用程序配置,有一个 Eventbus 连接,一个 API 调用,并提供静态 Web 内容。

从客户端到服务器的 API 调用有效,并且服务器正确获取 Web 内容,因此访问该工具有效。但是,代理导致 wss 失败(正如预期的那样),但降级到 xhr-streaming 失败了(我认为)

Java脚本:

var EB;
var URL;
var APICall = "api/eventbus/publish/";
var IncomingAddress = "heartbeat-test";
var OutgoingAddress = "client-test";

function createConnection(){
    URL = $("#serveraddressinput").val(); //Getting url from html text box
    console.log("Creating Eventbus connection at " + URL + "eventbus"); //Eventbus address is '<link>/eventbus'
    EB = new EventBus(URL + "eventbus");

    testAPICall();

    EB.onopen = function(){
        console.log("Eventbus connection successfully made at " + URL + "eventbus");
        console.log("Registering Eventbus handler for messages at " + IncomingAddress);

        EB.registerHandler(IncomingAddress, function(error, message){
            console.log("Received Eventbus message " + JSON.stringify(message));
    };

    EB.onclose = function(){
        console.log("Eventbus connection at " + URL + " has been lost");
        URL = "";
    };
}

function testAPICall(){
    var link = URL + APICall + "heartbeat-test";
    console.log("Testing API call to " + link);
    $.ajax({
        url: link,
        type: 'POST',
        data: JSON.stringify({"testFromClient": "Test message sent from Client via API Call"}),
        dataType: 'json',
        success: function (data, textStatus) {
            console.log("API Call Success: " + JSON.stringify(data));
        },
        error: function (request, error) {
            console.log("API Call ERROR: " + JSON.stringify(request) + " " + error);
        }
    });
}

function sendTestMessage(){
    console.log("Sending test message to address " + OutgoingAddress);
    EB.send(OutgoingAddress, "Testing 1, 2, 3...");
}

Java:

...

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeEvent;
import io.vertx.ext.web.handler.sockjs.BridgeEventType;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MyTestVerticle extends AbstractVerticle {

    private final static Logger log = LoggerFactory.getLogger(MyTestVerticle.class);

    final Level ACCESS = Level.forName("ACCESS", 450);

    private boolean started;

    private int port;

    @Value("${webserver.testpath.enabled}")
    private boolean testPathEnabled;

    @Value("${webserver.urlpath.test}")
    private String testUrlPath;

    @Value("${webserver.filepath.test}")
    private String testFilePath;

    @Value("${webserver.caching.enabled}")
    private boolean cachingEnabled;

    @Value("${webserver.ssl.enabled}")
    private boolean sslEnabled;

    private BridgeOptions bridgeOptions;

    private SockJSHandler sockJsHandler;

    private Router router;

    private JksOptions sslKeyStoreOptions;

    private JksOptions sslTrustStoreOptions;

    public MyTestVerticle() {
        this.started = false;
    }

    @Override
    public void start(Future<Void> fut) throws Exception {
        log.info("start() -- starting Vertx Verticle with eventbus, API handler, and static file handler");

        // grab the router
        router = getRouter();

        // enable CORS for the router 
        CorsHandler corsHandler = CorsHandler.create("*");  //Wildcard(*) not allowed if allowCredentials is true
        corsHandler.allowedMethod(HttpMethod.OPTIONS);
        corsHandler.allowedMethod(HttpMethod.GET);
        corsHandler.allowedMethod(HttpMethod.POST);
        corsHandler.allowedMethod(HttpMethod.PUT);
        corsHandler.allowedMethod(HttpMethod.DELETE);
        corsHandler.allowCredentials(false);
        corsHandler.allowedHeader("Access-Control-Request-Method");
        corsHandler.allowedHeader("Access-Control-Allow-Method");
        corsHandler.allowedHeader("Access-Control-Allow-Credentials");
        corsHandler.allowedHeader("Access-Control-Allow-Origin");
        corsHandler.allowedHeader("Access-Control-Allow-Headers");
        corsHandler.allowedHeader("Content-Type");

        // enable handling of body
        router.route().handler(BodyHandler.create());
        router.route().handler(corsHandler);
        router.route().handler(this::handleAccessLogging);

        // publish a payload to provided eventbus destination
        router.post("/api/eventbus/publish/:destination").handler(this::publish);

        // open up all for outbound and inbound traffic
        bridgeOptions = new BridgeOptions();
        bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex(".*"));
        bridgeOptions.addInboundPermitted(new PermittedOptions().setAddressRegex(".*"));
//        sockJsHandler = SockJSHandler.create(vertx).bridge(bridgeOptions);   
         sockJsHandler = SockJSHandler.create(vertx);
         sockJsHandler.bridge(bridgeOptions, be -> {
            try {
                if (be.type() == BridgeEventType.SOCKET_CREATED) {
                    handleSocketOpenEvent(be);
                }
                else if(be.type() ==BridgeEventType.REGISTER) {
                    handleRegisterEvent(be);
                }
                else if(be.type() ==BridgeEventType.UNREGISTER) {
                    handleUnregisterEvent(be);
                }
                else if(be.type() ==BridgeEventType.SOCKET_CLOSED) {
                    handleSocketCloseEvent(be);
                }
            } catch (Exception e) {

            } finally {
                be.complete(true);
            }
        });
        router.route("/eventbus/*").handler(sockJsHandler);

        if(testPathEnabled){
            router.route("/" + testUrlPath + "/*").handler(StaticHandler.create(testFilePath).setCachingEnabled(cachingEnabled));
        }

        // create periodic task, pushing all current EventBusRegistrations
        vertx.setPeriodic(1000, handler -> {
            JsonObject obj =new JsonObject();
            obj.put("testMessage", "Periodic test message from server...");
            vertx.eventBus().publish("heartbeat-test", Json.encodePrettily(obj));
        });

        EventBus eb = vertx.eventBus();
        eb.consumer("client-test", message -> {
            log.info("Received message from client: " + Json.encodePrettily(message.body()) + " at " + System.currentTimeMillis());
        });

        HttpServerOptions httpOptions = new HttpServerOptions();
        if(sslEnabled){
                httpOptions.setSsl(true);
                httpOptions.setKeyStoreOptions(sslKeyStoreOptions);
        }

        log.info("starting web server on port: " + port);
        vertx
                .createHttpServer(httpOptions)
                .requestHandler(router::accept).listen(
                port,
                result -> {
                    if (result.succeeded()) {
                        setStarted(true);
                        log.info("Server started and ready to accept requests");
                        fut.complete();
                    } else {
                        setStarted(false);
                        fut.fail(result.cause());
                    }
                }
        );
    }

    private void handleSocketOpenEvent(BridgeEvent be){
        String host =be.socket().remoteAddress().toString();
        String localAddress = be.socket().localAddress().toString();
        log.info("Socket connection opened! Host: " + host + " Local address: " + localAddress);
    }

    private void handleRegisterEvent(BridgeEvent be){
        String host =be.socket().remoteAddress().toString();
        String localAddress = be.socket().localAddress().toString();
        String address = be.getRawMessage().getString("address").trim();
        log.info("Eventbus register event! Address: " + address + " Host: " + host + " Local address: " + localAddress);
    }

    private void handleUnregisterEvent(BridgeEvent be){
        String host =be.socket().remoteAddress().toString();
        String localAddress = be.socket().localAddress().toString();
        String address = be.getRawMessage().getString("address").trim();
        log.info("Eventbus unregister event! Address: " + address + " Host: " + host + " Local address: " + localAddress);
    }

    private void handleSocketCloseEvent(BridgeEvent be){
        String host =be.socket().remoteAddress().toString();
        String localAddress = be.socket().localAddress().toString();
        log.info("Socket connection closed! Host: " + host + " Local address: " + localAddress);
    }

    //Method handles logging at custom level for access logging to files
    private void handleAccessLogging(RoutingContext routingContext){
        Marker accessMarker = MarkerFactory.getMarker("ACCESS");

        if(routingContext.normalisedPath().contains("/api")){
            log.info(accessMarker, "Api access log: request= " + routingContext.normalisedPath() + " source=" + routingContext.request().remoteAddress());
        }
        else{
            log.info(accessMarker, "Web access log: path= " + routingContext.normalisedPath() + " source= " + routingContext.request().remoteAddress());
        }

        routingContext.next();
    }

    /**
     * Accept a payload (anything) and publish to the provided destination
     *
     * @param routingContext
     */
    private void publish(RoutingContext routingContext) {
        String destination = routingContext.request().getParam("destination");
        String payload = routingContext.getBodyAsString();
        if ((destination == null) || (payload == null)) {
            Exception e = new Exception("Missing arguments");
            routingContext.response().setStatusCode(406);
            routingContext.fail(e);
        } else {
            log.info("API Call -> Publishing to destination: " + destination + " payload: " + payload);
            vertx.eventBus().publish(destination, payload);
            routingContext
                    .response()
                    .setStatusCode(200)
                    .putHeader("content-type", "application/json; charset=utf-8")
                    .end(payload);
        }
    }

    public boolean isStarted() {
        return started;
    }

    public void setStarted(boolean started) {
        this.started = started;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Router getRouter(){
        if(router == null){
            router = Router.router(vertx);
        }
        return router;
    }

    public void setRouter(Router router){
        this.router = router;
    }

    public void setSslOptions(JksOptions keyStoreOptions, JksOptions trustStoreOptions) {
        this.sslKeyStoreOptions = keyStoreOptions;
        this.sslTrustStoreOptions = trustStoreOptions;
    }
}

可以通过执行以下操作解决此错误:

  1. 在 Java verticle 中,将 Eventbus 处理程序移动到顶部,在任何其他处理程序之前。我相信 BodyHandler 或 CorsHandler 搞砸了并导致 500 错误。

    ...
    router.route("/eventbus/*").handler(sockJsHandler);
    ...
    // enable handling of body
    router.route().handler(BodyHandler.create());
    router.route().handler(corsHandler);
    router.route().handler(this::handleAccessLogging);
    
    // publish a payload to provided eventbus destination
    router.post("/api/eventbus/publish/:destination").handler(this::publish);