kotlin grpc.StreamObserver 删除它 rx.PublishSubject
kotlin grpc.StreamObserver to deletate to rx.PublishSubject
每当我们声明流时使用 GRPC 时 api
rpc heartBeat(Empty) returns (stream ServiceStatus){}
我们有用于观察者模式的 google 简单接口 StreamObserver
(这是 protobuf 将为我们生成的)
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
现在您要做的是将其转换为实际的 Observable
,然后才将其传递以供进一步使用。
override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
// we create rx java subject
val subject = PublishSubject.create<ServiceStatus>()
// we create grpc observer and delegate all calls to rx java
val observer = object : StreamObserver<ServiceStatus> {
override fun onNext(value: ServiceStatus) {
subject.onNext(value)
}
override fun onError(error: Throwable) {
subject.onError(error)
}
override fun onCompleted() {
subject.onCompleted()
}
}
// we use grpc observer for generated api
asyncStub.heartBeat(arg, observer)
// but we pass rx observable (subject) to client code
return subject
}
现在我是 Kotlin 的新手,但我无法弄清楚现有的委托功能有没有办法让 StreamObserver 的主题委托?
在Kotlin中写这段代码有没有更表达的方式?
我会创建一个通用方法来创建 StreamObserver
,将其传递给它的 lambda 参数并将结果包装在 Observable
.
中
inline fun <T> asObservable(
crossinline body: (StreamObserver<T>) -> Unit): Observable<T> {
return Observable.create { subscription ->
val observer = object : StreamObserver<T> {
override fun onNext(value: T) {
subscription.onNext(value)
}
override fun onError(error: Throwable) {
subscription.onError(error)
}
override fun onCompleted() {
subscription.onCompleted()
}
}
body(observer)
}
}
那么您可以通过以下方式实现RPC方法。
override fun heartBeat(arg: Empty): Observable<ServiceStatus> =
asObservable { asyncStub.heartBeat(arg, it) }
每当我们声明流时使用 GRPC 时 api
rpc heartBeat(Empty) returns (stream ServiceStatus){}
我们有用于观察者模式的 google 简单接口 StreamObserver
(这是 protobuf 将为我们生成的)
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
现在您要做的是将其转换为实际的 Observable
,然后才将其传递以供进一步使用。
override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
// we create rx java subject
val subject = PublishSubject.create<ServiceStatus>()
// we create grpc observer and delegate all calls to rx java
val observer = object : StreamObserver<ServiceStatus> {
override fun onNext(value: ServiceStatus) {
subject.onNext(value)
}
override fun onError(error: Throwable) {
subject.onError(error)
}
override fun onCompleted() {
subject.onCompleted()
}
}
// we use grpc observer for generated api
asyncStub.heartBeat(arg, observer)
// but we pass rx observable (subject) to client code
return subject
}
现在我是 Kotlin 的新手,但我无法弄清楚现有的委托功能有没有办法让 StreamObserver 的主题委托? 在Kotlin中写这段代码有没有更表达的方式?
我会创建一个通用方法来创建 StreamObserver
,将其传递给它的 lambda 参数并将结果包装在 Observable
.
inline fun <T> asObservable(
crossinline body: (StreamObserver<T>) -> Unit): Observable<T> {
return Observable.create { subscription ->
val observer = object : StreamObserver<T> {
override fun onNext(value: T) {
subscription.onNext(value)
}
override fun onError(error: Throwable) {
subscription.onError(error)
}
override fun onCompleted() {
subscription.onCompleted()
}
}
body(observer)
}
}
那么您可以通过以下方式实现RPC方法。
override fun heartBeat(arg: Empty): Observable<ServiceStatus> =
asObservable { asyncStub.heartBeat(arg, it) }