有人可以向我解释 gRPC StreamObserver.onError 的正确用法是什么吗?

Can someone explain to me what's the proper usage of gRPC StreamObserver.onError?

我正在尝试正确处理 gRPC 错误(Java、Spring-启动应用程序)。

基本上,我也需要将错误详细信息从 gRPC 服务器传输到客户端,但我发现很难理解 StreamObserver.onError();

的正确用法

方法文档说:

"Receives a terminating error from the stream. May only be called once and if called it must be the last method called. In particular if an exception is thrown by an implementation of onError no further calls to any method are allowed."

“不允许再调用” 是什么意思?在我维护的应用程序中,他们调用其他 gRPC 方法并获得 java.lang.IllegalStateException: call already closed,根据文档,这很好。

我想知道 - 我(开发人员)是否应该在收到错误后终止当前的 java 方法(usus gRPC 调用)?例如抛出异常以停止执行。或者预计 gRPC 将终止执行..(类似于从 gRPC 抛出异常)

基本上我该如何正确使用 onError() 以及如果调用它我应该期待和处理什么? 我需要解释一下它的用法和作用。

涉及两个 StreamObserver 实例。一种是针对入站方向,即 StreamObserver 实例 你实现 并传递给 gRPC 库。这是包含您如何处理响应的逻辑的 StreamObserver。另一个是出方向,就是调用RPC方法时gRPC库returns给你StreamObserver实例。这是您用来发送请求的 StreamObserver。大多数时候,这两个 StreamObserver 正在相互交互(例如,在全双工流式调用中,响应 StreamObserver 通常调用请求 StreamObserveronNext() 方法,这就是实现 ping-pong 行为的方式)。

"no further calls are allowed" 表示您不应该再在出站方向上呼叫 onNext(), onComplete() and/or onError() StreamObserver 当调用入站 StreamObserveronError() 方法时,即使您对入站 onError() 的实现抛出异常。由于入站 StreamObserver 是异步调用的,因此它与包含 StreamObserver 实现的方法无关。

例如:


public class HelloWorld {
  private final HelloWorldStub stub;
  private StreamObserver<HelloRequest> requestObserver;

  ...

  private void sendRequest(String message) {
    requestObserver.onNext(HelloRequest.newBuilder.setMessage(message).build());
  }

  public void start() {
    stub.helloWorld(new StreamObserver<HelloResponse> {
      @Override
      public void onNext(HelloResponse response) {
        sendRequest("hello from client");
        // Optionally you can call onCompleted() or onError() on 
        // the requestObserver to terminate the call.
      }

      @Override
      public void onCompleted() {
        // You should not call any method on requestObserver.
      }

      @Override
      public void onError(Throwable error) {
        // You should not call any method on requestObserver.
      }
    });
  }

}

start()方法无关

文档还提到你不应该做像

这样的事情
try {
  requestObserver.onCompleted();
} catch(RuntimeException e) {
  requestObserver.onError();
}

它主要用于用户自己的 StreamObserver 实现。 StreamObserver 由 gRPC 返回的永远不会抛出。

我提取了一个template for GRPC streaming which sort of abstracts away a lot of the GRPC boilerplate that also addresses the the logic for onError. In the DechunkingStreamObserver

我对 GRPC 流使用以下通用模式,这与

类似
META DATA DATA DATA META DATA DATA DATA

我将使用它的一个示例是采用一种形式并将其转换为另一种形式。

message SavedFormMeta {
  string id = 1;
}

message SavedFormChunk {
  oneof type {
    SavedFormMeta meta = 1;
    bytes data = 2;
  }
}

rpc saveFormDataStream(stream SavedFormChunk) returns (stream SavedFormChunk) {}

我使用一个标志来跟踪 inError 状态,以防止进一步处理并捕获 onNextonComplete 上的异常,这两个我都重定向到 onError它将错误转发到服务器端。

下面的代码提取 GRPC 语义并采用执行处理的 lamda。


/**
 * Dechunks a GRPC stream from the request and calls the consumer when a complete object is created.  This stops
 * further processing once an error has occurred.
 *
 * @param <T> entity type
 * @param <R> GRPC chunk message type
 * @param <S> GRPC message type for response streams
 */
class DechunkingStreamObserver<T, R, S> implements StreamObserver<R> {

    /**
     * This function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
     */
    private final BiFunction<T, R, T> combiner;

    /**
     * A function that takes in the assembled object and the GRPC response observer.
     */
    private final BiConsumer<T, StreamObserver<S>> consumer;

    /**
     * Predicate that returns true if it is a meta chunk indicating a start of a new object.
     */
    private final Predicate<R> metaPredicate;

    /**
     * this function gets the meta chunk and supplies a new object.
     */
    private final Function<R, T> objectSupplier;

    /**
     * GRPC response observer.
     */
    private final StreamObserver<S> responseObserver;

    /**
     * Currently being processed entity.
     */
    private T current = null;

    /**
     * In error state.  Starts {@code false}, but once it is set to {@code true} it stops processing {@link #onNext(Object)}.
     */
    private boolean inError = false;

    /**
     * @param metaPredicate    predicate that returns true if it is a meta chunk indicating a start of a new object.
     * @param objectSupplier   this function gets the meta chunk and supplies a new object
     * @param combiner         this function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
     * @param consumer         a function that takes in the assembled object and the GRPC response observer.
     * @param responseObserver GRPC response observer
     */
    DechunkingStreamObserver(
            final Predicate<R> metaPredicate,
            final Function<R, T> objectSupplier,
            final BiFunction<T, R, T> combiner,
            final BiConsumer<T, StreamObserver<S>> consumer,
            final StreamObserver<S> responseObserver) {

        this.metaPredicate = metaPredicate;
        this.objectSupplier = objectSupplier;
        this.combiner = combiner;
        this.consumer = consumer;
        this.responseObserver = responseObserver;
    }

    @Override
    public void onCompleted() {

        if (inError) {
            return;
        }
        try {
            if (current != null) {
                consumer.accept(current, responseObserver);
            }
            responseObserver.onCompleted();
        } catch (final Exception e) {
            onError(e);
        }

    }

    @Override
    public void onError(final Throwable throwable) {

        responseObserver.onError(throwable);
        inError = true;

    }

    @Override
    public void onNext(final R chunk) {

        if (inError) {
            return;
        }
        try {
            if (metaPredicate.test(chunk)) {
                if (current != null) {
                    consumer.accept(current, responseObserver);
                }
                current = objectSupplier.apply(chunk);
            } else {
                current = combiner.apply(current, chunk);
            }
        } catch (final Exception e) {
            onError(e);
        }
    }
}

我有 4 个 lamda

  • Predicate<R> metaPredicate 接受一个块和 returns 块是否是元数据。
  • Function<R, T> objectSupplier 它接受一个元块并创建一个供您的模块使用的新对象。
  • BiFunction<T, R, T> combiner, 接收数据块和当前对象,returns 包含组合的新对象。
  • BiConsumer<T, StreamObserver<S>> consumer 这将消耗一个完成的对象。它还在发送新对象作为响应的情况下传入一个流观察器。