集群 Vert.x 环境中的 SockJS 连接

SockJS connections in a clustered Vert.x environment

vertx 应用程序在 Docker 个容器中运行,在两个 EC2 实例上并且是集群的。

集群是通过hazelcast-aws插件实现的,应用程序是这样启动的:

docker run --name ... -p ... \
--network ... \
-v ... \
-d ... \
-c 'exec java \
-Dvertx.eventBus.options.setClustered=true \
-Dvertx.eventBus.options.setClusterPort=15701 \
-jar ... -conf ... \
-cluster'

没有以编程方式设置任何与集群相关的内容。

客户端在第一次请求时打开一个套接字,并将其用于以后的类似请求。
每个请求将:

  1. 通过向事件总线发布消息向服务器发起异步请求
  2. 在事件总线上注册一个消费者来处理上面的结果, 并传递了对套接字连接的引用,它应该将结果发送到

由于 vertx 在集群时默认执行循环并且有两个实例,这意味着任何实例都会收到每隔一条消息(来自上面的 1.),并使仅连接到一个实例的客户端接收到恰好一半所有预期的响应。

我想这是因为,即使已注册的消费者有对套接字对象的引用,它也不能使用它,因为它是在不同的 node/webserver.

上创建的

这是否正确?有没有一种方法可以将 100% 的消息发送到客户端,仅连接到一个节点,而无需引入 RabbitMQ 之类的东西?

这是 SockJS 处理程序代码:

SockJSHandler sockJSHandler = SockJSHandler.create(vertx, new SockJSHandlerOptions());
sockJSHandler.socketHandler(socket -> {
    SecurityService securityService = (SecurityService) ServiceFactory.getService(SecurityService.class);
    if (securityService.socketHeadersSecurity(socket)) {
        socket.handler(socketMessage -> {
            try {
                LOGGER.trace("socketMessage: " + socketMessage);
                Socket socket = Json.decodeValue(socketMessage.toString(), Socket.class);
                Report report = socket.getReport();
                if (report != null) {
                    Account accountRequest = socket.getAccount();
                    Account accountDatabase = accountRequest == null ? null
                            : ((AccountService) ServiceFactory.getService(AccountService.class)).getById(accountRequest.getId());
                    Response result = securityService.socketReportSecurity(accountRequest, accountDatabase, report) ?
                            ((ReportService) ServiceFactory.getService(ReportService.class)).createOrUpdateReport(report, accountDatabase)
                            : new Response(Response.unauthorized);
                    if (Response.success.equals(result.getResponse())) {
                        //register a consumer
                        String consumerName = "report.result." + Timestamp.from(ClockFactory.getClock().instant());
                        vertx.eventBus().consumer(consumerName, message -> {
                            Response executionResult;
                            if ("success".equals(message.body())) {
                                try {
                                    Path csvFile = Paths.get(config.getString(Config.reportPath.getConfigName(), Config.reportPath.getDefaultValue())
                                            + "/" + ((Report) result.getPayload()).getId() + ".csv");
                                    executionResult = new Response(new JsonObject().put("csv", new String(Files.readAllBytes(csvFile))));
                                } catch (IOException ioEx) {
                                    executionResult = new Response(new Validator("Failed to read file.", ioEx.getMessage(), null, null));
                                    LOGGER.error("Failed to read file.", ioEx);
                                }
                            } else {
                                executionResult = new Response(new Validator("Report execution failed", (String)message.body(), null, null));
                            }
                            //send second message to client
                            socket.write(Json.encode(executionResult));
                            vertx.eventBus().consumer(consumerName).unregister();
                        });
                        //order report execution
                        vertx.eventBus().send("report.request", new JsonObject()
                                .put("reportId", ((Report) result.getPayload()).getId())
                                .put("consumerName", consumerName));
                    }
                    //send first message to client
                    socket.write(Json.encode(result));
                } else {
                    LOGGER.info("Insufficient data sent over socket: " + socketMessage.toString());
                    socket.end();
                }
            } catch (DecodeException dEx) {
                LOGGER.error("Error decoding message.", dEx);
                socket.end();
            }
        });
    } else {
        LOGGER.info("Illegal socket connection attempt from: " + socket.remoteAddress());
        socket.end();
    }
});
mainRouter.route("/websocket/*").handler(sockJSHandler);

有趣的是,当 运行 两个节点集群在本地主机上时,客户端获得 100% 的结果。

编辑: 这不是 SockJS,而是配置问题。

Since vertx does round robin by default when clustered and there are two instances, this means any instance gets every other message (from 1., above) and makes the client, which connects to one instance only, receive exactly half of all expected responses.

这个假设只是部分正确。 Vert.x 进行循环,是的,但这意味着每个实例将获得一半的连接,而不是一半的消息。

连接建立后,其所有消息将到达一个实例。

所以这个:

Would that be correct and is there a way to get 100% of messages to the client, connected to just one node, without introducing things like RabbitMQ?

已经发生了。