两个 Quarkus 服务之间的非阻塞数据流(Vert.x with Mutiny in Java)
Non-blocking streaming of data between two Quarkus services (Vert.x with Mutiny in Java)
更新!
在解决了一些与仍然是关于服务之间的非阻塞流的主要问题无关的问题后,我修复了示例代码中的小错误。
背景资料:
我正在 Quarkus 下移植 Spring WebFlux 服务。该服务对多个庞大的数据集进行长时间搜索,returns 部分结果在 Flux (text/event-stream) 中可用。
问题:
现在我正在尝试在 Quarkus 下将 Mutiny Multi 与 Vert.x 一起使用,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。
在所有示例中,消费者要么是一个 JS 前端页面,要么生产者的内容类型是 application/json,这似乎是 bluck,直到 Multi 完成后再将其发送到一个 JSON 对象(其中在我的申请中没有任何意义)。
问题:
- 如何使用 Mutiny 风格的 Vert.x WebClient 接收 text/event-stream?
- 如果问题是 WebClient 无法接收连续流:在两个 Quarkus 服务之间传输数据的标准方法是什么?
这是一个简化的例子
测试实体
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Producer 1. 简单无限流 -> 挂起
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2) .onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. 使用 Vertx Paths 无限流 -> 挂起
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
生产者 3. 简单的有限流 -> 块直到完成
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
消费者:
在生产者和消费者方面尝试了多种不同的解决方案,但在每种情况下,流都会阻塞直到完成或无限期挂起而不会为无限流传输数据。我用 httpie 得到了相同的结果。这是最新的迭代:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
Vert.x Web 客户端不能与 SSE 一起工作(没有配置)。
来自 https://vertx.io/docs/vertx-web-client/java/:
Responses are fully buffered, use BodyCodec.pipe to pipe the response to a write stream
等待响应完成。您可以使用 raw Vert.x HTTP 客户端或使用 pipe
编解码器。 https://vertx.io/docs/vertx-web-client/java/#_decoding_responses.
上给出了示例
或者,您可以使用 SSE 客户端,例如:
https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34
更新!
在解决了一些与仍然是关于服务之间的非阻塞流的主要问题无关的问题后,我修复了示例代码中的小错误。
背景资料:
我正在 Quarkus 下移植 Spring WebFlux 服务。该服务对多个庞大的数据集进行长时间搜索,returns 部分结果在 Flux (text/event-stream) 中可用。
问题:
现在我正在尝试在 Quarkus 下将 Mutiny Multi 与 Vert.x 一起使用,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。
在所有示例中,消费者要么是一个 JS 前端页面,要么生产者的内容类型是 application/json,这似乎是 bluck,直到 Multi 完成后再将其发送到一个 JSON 对象(其中在我的申请中没有任何意义)。
问题:
- 如何使用 Mutiny 风格的 Vert.x WebClient 接收 text/event-stream?
- 如果问题是 WebClient 无法接收连续流:在两个 Quarkus 服务之间传输数据的标准方法是什么?
这是一个简化的例子
测试实体
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Producer 1. 简单无限流 -> 挂起
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2) .onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. 使用 Vertx Paths 无限流 -> 挂起
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
生产者 3. 简单的有限流 -> 块直到完成
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
消费者:
在生产者和消费者方面尝试了多种不同的解决方案,但在每种情况下,流都会阻塞直到完成或无限期挂起而不会为无限流传输数据。我用 httpie 得到了相同的结果。这是最新的迭代:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
Vert.x Web 客户端不能与 SSE 一起工作(没有配置)。 来自 https://vertx.io/docs/vertx-web-client/java/:
Responses are fully buffered, use BodyCodec.pipe to pipe the response to a write stream
等待响应完成。您可以使用 raw Vert.x HTTP 客户端或使用 pipe
编解码器。 https://vertx.io/docs/vertx-web-client/java/#_decoding_responses.
或者,您可以使用 SSE 客户端,例如: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34