在 vertx GraphQL 中处理订阅

Handle Subscription in vertx GraphQL

我尝试使用 Vertx HttpClient/WebClient 来使用 GraphQLSubscritpion 但它没有按预期工作。

服务器端相关代码(使用Vertx Web GraphQL编写)如下,添加评论时,触发onNext将评论发送到Publisher

    public VertxDataFetcher<UUID> addComment() {
        return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
            var commentInputArg = dfe.getArgument("commentInput");
            var jacksonMapper = DatabindCodec.mapper();
            var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
            return this.posts.addComment(input)
                .onSuccess(id -> this.posts.getCommentById(id.toString())
                                        .onSuccess(c ->subject.onNext(c)));
        });
    }

    private BehaviorSubject<Comment> subject = BehaviorSubject.create();

    public  DataFetcher<Publisher<Comment>> commentAdded() {
        return (DataFetchingEnvironment dfe) -> {
            ConnectableObservable<Comment> connectableObservable = subject.share().publish();
            connectableObservable.connect();
            return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
        };
    }

在客户端,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单post。但它似乎没有 WebSocket 连接。

所以 websocket 部分正在返回使用 HttpClient。


 var options = new HttpClientOptions()
            .setDefaultHost("localhost")
            .setDefaultPort(8080);

        var httpClient = vertx.createHttpClient(options);
        httpClient.webSocket("/graphql")
            .onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));

                JsonObject messageInit = new JsonObject()
                    .put("type", "connection_init")
                    .put("id", "1");

                JsonObject message = new JsonObject()
                    .put("payload", new JsonObject()
                        .put("query", "subscription onCommentAdded { commentAdded { id content } }"))
                    .put("type", "start")
                    .put("id", "1");

                ws.write(messageInit.toBuffer());
                ws.write(message.toBuffer());

})
            .onFailure(e -> log.error("error: {}", e));


// this client here is WebClient.
        client.post("/graphql")
            .sendJson(Map.of(
                "query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
                "variables", Map.of(
                    "input", Map.of(
                        "postId", id,
                        "content", "comment content of post id" + LocalDateTime.now()
                    )
                )
            ))
            .onSuccess(
                data -> log.info("data of addComment: {}", data.bodyAsString())
            )
            .onFailure(e -> log.error("error: {}", e));

当运行客户端和服务器时,添加注释,但WebSocket客户端不打印任何关于websocket消息的信息。在服务器控制台上,有这样的消息。

2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors

似乎根本没有调用后端 commentAdded datafetcher。

The complete codes 的 GraphQL 客户端和服务器在我的 Github.

上共享

看完some testing codes of Vertx Web GraphQL后,我发现我必须像这样在ApolloWSHandler上添加ConnectionInitHandler

.connectionInitHandler(connectionInitEvent -> {
        JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
        if (payload != null && payload.containsKey("rejectMessage")) {
          connectionInitEvent.fail(payload.getString("rejectMessage"));
          return;
        }
        connectionInitEvent.complete(payload);
      }
)

当客户端发送connection_init消息时,需要connectionInitEvent.complete才能开始客户端与服务器之间的通信。