RX Java 单身未从 Single.merge 返回
RX Java Single not returning from Single.merge
我有几个 api 呼叫(Rx 单打),我想合并成一个单打。我正在使用 Single.merge 尝试组合这些调用的结果,但是当我订阅响应时,我得到一个空数组,因为订阅已经发生。我调用 HealthChecker 期望订阅将 return 结果列表:
new HealthChecker(vertx)
.getHealthChecks(endpoints)
.subscribe(messages -> {
log.info("Completed health check {}", messages);
routingContext.response()
.putHeader("content-type", "text/json")
.end(messages.toString());
});
健康检查器 class 执行逻辑:
public class HealthChecker {
private static Logger log = LoggerFactory.getLogger(HealthChecker.class);
private Vertx vertx;
private WebClient client;
public HealthChecker(Vertx vertx) {
this.vertx = vertx;
client = WebClient.create(vertx);
}
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.fromCallable(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks).blockingGet();
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
private Single<String> getHealthStatus(WebClient client, String endpoint) {
log.info("getting endpoint {}", endpoint);
return client
.getAbs(endpoint)
.rxSend()
.map(HttpResponse::bodyAsString)
.map(response -> response);
}
}
我希望 return 值是一个列表,除了我得到的只是一个空列表,然后结果就出来了。这是日志:
09:12:06.235 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
为什么要使用 fromCallable
和 blockingGet
?此外,您在没有实际等待 运行 完成的情况下触发 merge
,因此列表为空。相反,撰写内部 Single
s:
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.defer(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks);
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.toList();
}
您的问题在这里:
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
您正在创建一个空列表,然后从 lambda 返回它,这样从 consumeHealthChecks
返回的 Single 是空列表的 Single...
我猜你想做的是这样的:
private Single<List<String>> mergeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS);
}
然后像这样使用它:
private void consumeHealthChecks() {
Single<List<String>> healthChecks = new HealthChecker(vertx)
.getHealthChecks(endpoints);
mergeHealthChecks(healthChecks)
.subscribe(message -> {
log.info("Merged and consumed all health checks. Final health check: ", message);
}, error -> {
log.info("Timeout - could not merge and consume health checks");
});
}
请注意,当您使用 Single.merge
时,您只会在订阅成功回调中获得最终合并的结果,因此,如果您想在成功消费每条消息时记录它,您将想要在订阅调用之前使用 doOnSuccess
连接一个副作用调用来记录消息。
我有几个 api 呼叫(Rx 单打),我想合并成一个单打。我正在使用 Single.merge 尝试组合这些调用的结果,但是当我订阅响应时,我得到一个空数组,因为订阅已经发生。我调用 HealthChecker 期望订阅将 return 结果列表:
new HealthChecker(vertx)
.getHealthChecks(endpoints)
.subscribe(messages -> {
log.info("Completed health check {}", messages);
routingContext.response()
.putHeader("content-type", "text/json")
.end(messages.toString());
});
健康检查器 class 执行逻辑:
public class HealthChecker {
private static Logger log = LoggerFactory.getLogger(HealthChecker.class);
private Vertx vertx;
private WebClient client;
public HealthChecker(Vertx vertx) {
this.vertx = vertx;
client = WebClient.create(vertx);
}
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.fromCallable(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks).blockingGet();
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
private Single<String> getHealthStatus(WebClient client, String endpoint) {
log.info("getting endpoint {}", endpoint);
return client
.getAbs(endpoint)
.rxSend()
.map(HttpResponse::bodyAsString)
.map(response -> response);
}
}
我希望 return 值是一个列表,除了我得到的只是一个空列表,然后结果就出来了。这是日志:
09:12:06.235 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
为什么要使用 fromCallable
和 blockingGet
?此外,您在没有实际等待 运行 完成的情况下触发 merge
,因此列表为空。相反,撰写内部 Single
s:
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.defer(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks);
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.toList();
}
您的问题在这里:
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
您正在创建一个空列表,然后从 lambda 返回它,这样从 consumeHealthChecks
返回的 Single 是空列表的 Single...
我猜你想做的是这样的:
private Single<List<String>> mergeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS);
}
然后像这样使用它:
private void consumeHealthChecks() {
Single<List<String>> healthChecks = new HealthChecker(vertx)
.getHealthChecks(endpoints);
mergeHealthChecks(healthChecks)
.subscribe(message -> {
log.info("Merged and consumed all health checks. Final health check: ", message);
}, error -> {
log.info("Timeout - could not merge and consume health checks");
});
}
请注意,当您使用 Single.merge
时,您只会在订阅成功回调中获得最终合并的结果,因此,如果您想在成功消费每条消息时记录它,您将想要在订阅调用之前使用 doOnSuccess
连接一个副作用调用来记录消息。