returns Mono 基于 Java 中可变属性的方法的线程安全

Thread safety for method that returns Mono based on mutable attribute in Java

在我的 Spring 引导应用程序中,我有一个组件应该监视另一个外部系统的健康状态。该组件还提供了一个 public 反应链可以订阅的方法,以等待外部系统启动。

@Component
public class ExternalHealthChecker {
  private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);

  private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted

  private volatile boolean isUp = true;
  private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();

  @Scheduled(cron = "0/10 * * ? * *")
  private void checkExternalSystemHealth() {
    webClient.get() //
        .uri("/health") //
        .retrieve() //
        .bodyToMono(Void.class) //
        .doOnError(this::handleHealthCheckError) //
        .doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
        .subscribe(); //
  }

  private void handleHealthCheckError(final Throwable error) {
    if (this.isUp) {
      LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
    }
    this.isUp = false;
  }

  private void handleHealthCheckSuccess() {
  // the status changed from down -> up, which has to complete the future that might be currently waited on  
  if (!this.isUp) {
      LOG.warn("External System is now UP again.");
      this.isUp = true;
      this.completeWhenUp.complete("UP");
      this.completeWhenUp = new CompletableFuture<>();
    }
  }


  public Mono<String> waitForExternalSystemUPStatus() {
    if (this.isUp) {
      LOG.info("External System is already UP!");
      return Mono.empty();
    } else {
      LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
      return Mono.fromFuture(completeWhenUp);
    }
  }
}

方法 waitForExternalSystemUPStatus 是 public 并且可以从许多不同的线程中调用。这背后的想法是为应用程序中的一些反应通量链提供一种暂停处理直到外部系统启动的方法。当外部系统关闭时,这些链无法处理它们的元素。

someFlux
  .doOnNext(record -> LOG.info("Next element")
  .delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
  ... // starting processing

这里的问题是我无法真正理解这段代码的哪一部分需要同步。我认为多个线程同时调用 waitForExternalSystemUPStatus 应该没有问题,因为这个方法没有写任何东西。所以我觉得这个方法不需要同步。但是,用 @Scheduled 注释的方法也会在它自己的线程上 运行 并且实际上会写入 isUp 的值并且还可能将 completeWhenUp 的引用更改为新的、未完成的未来实例。我用 volatile 标记了这两个可变属性,因为从阅读 Java 中的这个关键字后,我觉得它有助于确保读取这两个值的线程看到最新值。但是,我不确定我是否还需要在部分代码中添加 synchronized 关键字。我也不确定 synchronized 关键字是否与反应器代码配合得很好,我很难找到这方面的信息。也许还有一种以更完整、反应性的方式提供 ExternalHealthChecker 功能的方法,但我想不出任何方法。

我强烈反对这种方法。像这样的线程代码的问题是它变得非常难以理解和推理。我认为你 至少 需要同步 handleHealthCheckSuccess()waitForExternalSystemUPStatus() 中引用你的 completeWhenUp 字段的部分,否则你可能会有种族危险在你手上(只有一个写给它,但在写完之后它可能会被乱序读取)——但很可能还有其他我遗漏的东西,如果是的话,它可能会显示为这些烦人的“之一”在一百万”类型的错误中,几乎不可能确定。

尽管如此,应该有一种更可靠、更简单的方法来实现这一点。我不使用 Spring 调度程序,而是在按如下方式创建 ExternalHealthChecker 组件时创建一个流量:

healthCheckStream = Flux.interval(Duration.ofMinutes(10))
        .flatMap(i ->
                webClient.get().uri("/health")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(s -> true)
                        .onErrorResume(e -> Mono.just(false)))
        .cache(1);

...其中 healthCheckStream 是类型 Flux<Boolean> 的字段。 (注意它不需要是易失性的,因为你永远不会替换它所以跨线程的担忧不适用 - 它是同一个流,根据健康检查状态每 10 分钟更新一次不同的结果,无论线程您将从中访问它。)

这基本上每 10 分钟创建一个健康检查响应值流,始终缓存最新的响应,并将其变成热源。这意味着“在您订阅之前什么都不会发生”在这种情况下不适用 - 通量将立即开始执行,任何进入任何线程的新订阅者将始终获得最新结果,无论是通过还是失败. handleHealthCheckSuccess()handleHealthCheckError()isUpcompleteWhenUp 都是多余的,它们可以去 - 然后你的 waitForExternalSystemUPStatus() 可以变成一行:

return healthCheckStream.filter(x -> x).next();

...然后工作完成,您可以从任何地方调用它,您将拥有一个 Mono 只有在系统启动时才会完成的