如何在另一个线程的函数中抛出异常?

How to throw an exception in a function from another thread?

我遇到了一个非常奇怪的问题:基本上,我在 java 中实现了一个服务器流式传输 grpc 客户端,并且我将微配置文件用作 library/structure。 Microprofiles 在自动重试和回退方面有非常方便的拦截器,但它依赖于拦截注释函数中抛出的异常,否则不会触发重试。

对于单一的 grpc 调用,这工作得很好,它的工作方式与普通的 REST 调用非常相似。但是当客户端到 grpc 服务器端流时,protobuf 将创建它自己的线程并请求回调来处理 onNext、onError 和 onCompleted。因此,当在流范围内调用 onError 时,抛出的任何异常都不会发送回用于启动流的任何函数,因此不会抛出异常来触发 @Retry。

无法更改异步流的处理方式,也无法更改@Retry 的触发方式。 grpc为生成代码,@Retry触发器基于microprofile库

示例:

  @Retry(
          retryOn = {IOException.class, TimeoutException.class, StatusRuntimeException.class},
          maxDuration = 10,
          durationUnit =  ChronoUnit.SECONDS,
          maxRetries = 1,
          delay = 10,
          delayUnit = ChronoUnit.SECONDS
  )
  public void subscribeToLocations() {
    // --> Throwing an exception here triggers the @Retry <--
    SubscribeRequest locSubscribeRequest = SubscribeRequest.newBuilder().build();
    streamObserver = grpcStreamHandler();
    grpcBlockingstub.subscribeServerStreaming(locSubscribeRequest, streamObserver); // Can't change this.
  }

  private StreamObserver<SubscribeResponse> grpcStreamHandler() {
    return new StreamObserver<SubscribeResponse>() {
      @Override
      public void onNext(SubscribeResponse value) {
        // Handle grpc response
      }

      @Override
      public void onError(Throwable t) {
        // --> ERROR: Here, it should trigger the @Retry somehow. <--
      }

      @Override
      public void onCompleted() {
        // Handle oncomplete
      }
    };
  }

我试图找到解决这个问题的时间比我愿意承认的要长,但我仍然不知所措。有没有一种方法可以在一个范围内抛出异常而在另一个范围内结束?还有其他解决方案吗?

您在这里混合了同步和异步操作,因此您需要清楚地定义什么是 "success" 并且您不需要引发重试。这可能取决于您的逻辑,流媒体选项很少:

  • 至少收到一个对 onNext 的更新,然后完成或错误
  • onCompleted 收到或没有任何更新
  • 等等

定义 "success" 规则后,您可以将订阅包装到 Future 中并等待它在提供的超时时间内解决。如果未解决,则从原始调用中抛出异常。