"Bridging" 来自 gRPC StreamObserver 的 Reactor 的 Flux

"Bridging" Reactor's Flux from gRPC StreamObserver

我想创建一个 Reactor Flux from a gRPC StreamObserver. This needs to be done as long as StreamObserver does not implement the respective interfaces natively (see e.g. this issue).

我想出来的大概是这样的:

final StreamObserver<ProtoResponse>[] streamObserverArray = new  StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
        @Override
        public void onNext(ProtoResponse value) {
            final Response response = convertFromProto(value);
            sink.next(response);
        }

        @Override
        public void onError(Throwable throwable) {
            sink.error(throwable);
        }

        @Override
        public void onCompleted() {
            sink.complete();
        }
    });
myFlux
    .doOnError(throwable -> {/* actual logic in here */}) //
    .doOnComplete(() -> {/* actual logic in here */}) //
    .doOnCancel(() -> {/* actual logic in here */}) //
    .parallel() //
    .runOn(Schedulers.parallel()) //
    .doOnNext(/* actual heavy lifting logic in here */) //
    .map(/* ... */) //
    .sequential() //
    .doOnNext(/* ...*/) //
    .subscribe(); // needed to start the actual processing of the events on this Flux

MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);

我想在这里使用 Reactor 的主要想法是将 "heavy lifting work" 分发到多个并行线程上,而不是在 gRPC 请求线程上执行此操作。

我发现上述方法存在几个问题:

所以我的问题是: 从 gRPC StreamObserver 桥接到 Reactor Flux 的 best/preferred 方法是什么?有什么最佳做法吗?

经过一番折腾并更好地理解整个响应式内容后,我想出了以下解决方案:

/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {

    private Subscriber<? super Long> subscriber;

    @Override
    public void onNext(Long l) {
        subscriber.onNext(l);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onCompleted() {
        subscriber.onComplete();
    }

    @Override
    public void subscribe(Subscriber<? super Long> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new BaseSubscriber() {});
    }
}

// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);

现在有一个更简单的解决方案:

https://github.com/salesforce/reactive-grpc.

它支持将 gRPC 桥接到 Reactor 和 RxJava 2。