"Bridging" 来自 gRPC StreamObserver 的 Reactor 的 Flux
"Bridging" Reactor's Flux from gRPC StreamObserver
我想创建一个 Reactor Flux from a gRPC StreamObserver. This needs to be done as long as StreamObserver does not implement the respective interfaces natively (see e.g. this issue).
我想出来的大概是这样的:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
我想在这里使用 Reactor 的主要想法是将 "heavy lifting work" 分发到多个并行线程上,而不是在 gRPC 请求线程上执行此操作。
我发现上述方法存在几个问题:
- 我真的不喜欢
StreamObserver[]
数组的解决方法
- 我需要先创建完整的通量,因为如果我不先用
.subscribe()
完成它,当 gRPC 开始通信时 StreamObserver
可能是 null
(又名竞赛条件)。
- 我不确定背压是否按预期方式工作(虽然这不是我目前主要关心的问题)。
所以我的问题是:
从 gRPC StreamObserver 桥接到 Reactor Flux 的 best/preferred 方法是什么?有什么最佳做法吗?
经过一番折腾并更好地理解整个响应式内容后,我想出了以下解决方案:
/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {
private Subscriber<? super Long> subscriber;
@Override
public void onNext(Long l) {
subscriber.onNext(l);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onCompleted() {
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
this.subscriber = subscriber;
this.subscriber.onSubscribe(new BaseSubscriber() {});
}
}
// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);
现在有一个更简单的解决方案:
https://github.com/salesforce/reactive-grpc.
它支持将 gRPC 桥接到 Reactor 和 RxJava 2。
我想创建一个 Reactor Flux from a gRPC StreamObserver. This needs to be done as long as StreamObserver does not implement the respective interfaces natively (see e.g. this issue).
我想出来的大概是这样的:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
我想在这里使用 Reactor 的主要想法是将 "heavy lifting work" 分发到多个并行线程上,而不是在 gRPC 请求线程上执行此操作。
我发现上述方法存在几个问题:
- 我真的不喜欢
StreamObserver[]
数组的解决方法 - 我需要先创建完整的通量,因为如果我不先用
.subscribe()
完成它,当 gRPC 开始通信时StreamObserver
可能是null
(又名竞赛条件)。 - 我不确定背压是否按预期方式工作(虽然这不是我目前主要关心的问题)。
所以我的问题是: 从 gRPC StreamObserver 桥接到 Reactor Flux 的 best/preferred 方法是什么?有什么最佳做法吗?
经过一番折腾并更好地理解整个响应式内容后,我想出了以下解决方案:
/**
* Bridge the StreamObserver from gRPC to the Publisher from the reactive world.
*/
public class StreamObserverPublisher implements Publisher<Long>, StreamObserver<Long> {
private Subscriber<? super Long> subscriber;
@Override
public void onNext(Long l) {
subscriber.onNext(l);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onCompleted() {
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
this.subscriber = subscriber;
this.subscriber.onSubscribe(new BaseSubscriber() {});
}
}
// and somewhere else in the code
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<Long> longFlux = Flux.from(streamObserverPublisher);
longFlux.subscribe(...); // must be done before executing the gRPC request
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverPublisher);
现在有一个更简单的解决方案:
https://github.com/salesforce/reactive-grpc.
它支持将 gRPC 桥接到 Reactor 和 RxJava 2。