在 vertx GraphQL 中处理订阅
Handle Subscription in vertx GraphQL
我尝试使用 Vertx HttpClient/WebClient 来使用 GraphQLSubscritpion
但它没有按预期工作。
服务器端相关代码(使用Vertx Web GraphQL编写)如下,添加评论时,触发onNext
将评论发送到Publisher
。
public VertxDataFetcher<UUID> addComment() {
return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
var commentInputArg = dfe.getArgument("commentInput");
var jacksonMapper = DatabindCodec.mapper();
var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
return this.posts.addComment(input)
.onSuccess(id -> this.posts.getCommentById(id.toString())
.onSuccess(c ->subject.onNext(c)));
});
}
private BehaviorSubject<Comment> subject = BehaviorSubject.create();
public DataFetcher<Publisher<Comment>> commentAdded() {
return (DataFetchingEnvironment dfe) -> {
ConnectableObservable<Comment> connectableObservable = subject.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
};
}
在客户端,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单post。但它似乎没有 WebSocket
连接。
所以 websocket 部分正在返回使用 HttpClient。
var options = new HttpClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8080);
var httpClient = vertx.createHttpClient(options);
httpClient.webSocket("/graphql")
.onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));
JsonObject messageInit = new JsonObject()
.put("type", "connection_init")
.put("id", "1");
JsonObject message = new JsonObject()
.put("payload", new JsonObject()
.put("query", "subscription onCommentAdded { commentAdded { id content } }"))
.put("type", "start")
.put("id", "1");
ws.write(messageInit.toBuffer());
ws.write(message.toBuffer());
})
.onFailure(e -> log.error("error: {}", e));
// this client here is WebClient.
client.post("/graphql")
.sendJson(Map.of(
"query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
"variables", Map.of(
"input", Map.of(
"postId", id,
"content", "comment content of post id" + LocalDateTime.now()
)
)
))
.onSuccess(
data -> log.info("data of addComment: {}", data.bodyAsString())
)
.onFailure(e -> log.error("error: {}", e));
当运行客户端和服务器时,添加注释,但WebSocket客户端不打印任何关于websocket消息的信息。在服务器控制台上,有这样的消息。
2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors
似乎根本没有调用后端 commentAdded
datafetcher。
The complete codes 的 GraphQL 客户端和服务器在我的 Github.
上共享
看完some testing codes of Vertx Web GraphQL后,我发现我必须像这样在ApolloWSHandler
上添加ConnectionInitHandler
。
.connectionInitHandler(connectionInitEvent -> {
JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
if (payload != null && payload.containsKey("rejectMessage")) {
connectionInitEvent.fail(payload.getString("rejectMessage"));
return;
}
connectionInitEvent.complete(payload);
}
)
当客户端发送connection_init
消息时,需要connectionInitEvent.complete
才能开始客户端与服务器之间的通信。
我尝试使用 Vertx HttpClient/WebClient 来使用 GraphQLSubscritpion
但它没有按预期工作。
服务器端相关代码(使用Vertx Web GraphQL编写)如下,添加评论时,触发onNext
将评论发送到Publisher
。
public VertxDataFetcher<UUID> addComment() {
return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
var commentInputArg = dfe.getArgument("commentInput");
var jacksonMapper = DatabindCodec.mapper();
var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
return this.posts.addComment(input)
.onSuccess(id -> this.posts.getCommentById(id.toString())
.onSuccess(c ->subject.onNext(c)));
});
}
private BehaviorSubject<Comment> subject = BehaviorSubject.create();
public DataFetcher<Publisher<Comment>> commentAdded() {
return (DataFetchingEnvironment dfe) -> {
ConnectableObservable<Comment> connectableObservable = subject.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
};
}
在客户端,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单post。但它似乎没有 WebSocket
连接。
所以 websocket 部分正在返回使用 HttpClient。
var options = new HttpClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8080);
var httpClient = vertx.createHttpClient(options);
httpClient.webSocket("/graphql")
.onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));
JsonObject messageInit = new JsonObject()
.put("type", "connection_init")
.put("id", "1");
JsonObject message = new JsonObject()
.put("payload", new JsonObject()
.put("query", "subscription onCommentAdded { commentAdded { id content } }"))
.put("type", "start")
.put("id", "1");
ws.write(messageInit.toBuffer());
ws.write(message.toBuffer());
})
.onFailure(e -> log.error("error: {}", e));
// this client here is WebClient.
client.post("/graphql")
.sendJson(Map.of(
"query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
"variables", Map.of(
"input", Map.of(
"postId", id,
"content", "comment content of post id" + LocalDateTime.now()
)
)
))
.onSuccess(
data -> log.info("data of addComment: {}", data.bodyAsString())
)
.onFailure(e -> log.error("error: {}", e));
当运行客户端和服务器时,添加注释,但WebSocket客户端不打印任何关于websocket消息的信息。在服务器控制台上,有这样的消息。
2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors
似乎根本没有调用后端 commentAdded
datafetcher。
The complete codes 的 GraphQL 客户端和服务器在我的 Github.
上共享看完some testing codes of Vertx Web GraphQL后,我发现我必须像这样在ApolloWSHandler
上添加ConnectionInitHandler
。
.connectionInitHandler(connectionInitEvent -> {
JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
if (payload != null && payload.containsKey("rejectMessage")) {
connectionInitEvent.fail(payload.getString("rejectMessage"));
return;
}
connectionInitEvent.complete(payload);
}
)
当客户端发送connection_init
消息时,需要connectionInitEvent.complete
才能开始客户端与服务器之间的通信。