在异步 grpc 存根中取消订阅观察者 - Java / Kotlin

Unsubscribe observer in Async grpc stub - Java / Kotlin

我有一个异步存根,我在其中添加了一个观察者:

            val obs =  object: StreamObserver<Hallo> {

                override fun onNext(value: Hallo) {

                    streamSuccess(value)
                }

                override fun onError(t: Throwable?) {

                    nonSuccess(t?.message ?: "Unknow error")
                }

                override fun onCompleted() {

                    Log.d("Info", "completed")
                    completed()
                }
            }

我希望能够从异步存根中删除这个观察者,这样我就可以在客户端取消流式传输。

如 github 问题所述:https://github.com/grpc/grpc-java/issues/3095

我尝试保留观察者的局部变量,以便客户端稍后可以做:

observer?.onError(Status.CANCELLED.cause)

那没用。

我还尝试从摘要 class 中创建自己的 class:ClientCallStreamObserver

class CancellableStreamObserver<TResponse>(val next:(value:TResponse)->Unit, val onError:(t:Throwable)-> Unit, val onCompleted:(()->Unit), val onCanceledHandler: (()->Unit)? = null) : ClientCallStreamObserver<TResponse>() {
        override fun isReady(): Boolean {
            return  true
        }

        override fun setOnReadyHandler(onReadyHandler: Runnable?) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun disableAutoInboundFlowControl() {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun cancel(message: String?, cause: Throwable?) {

            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun request(count: Int) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun setMessageCompression(enable: Boolean) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun onNext(value: TResponse) {
            next(value)
        }

        override fun onError(t: Throwable) {
            if (t is StatusException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            if (t is StatusRuntimeException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            this.onError(t)
        }

        override fun onCompleted() {
            onCompleted()
        }
    }

稍后我可以打电话给:

        observer?.cancel("Cancelled for the user",Status.CANCELLED.cause)

那也没用。

据我所知,它不起作用,因为如果用户再次添加一个新的观察者,我会得到重复的响应,就好像旧的观察者还活着一样。

我知道我可以用 channel.shutdownNow() 关闭频道。但是我觉得太霸道了。

谢谢

来自引用的https://github.com/grpc/grpc-java/issues/3095

for async you can use ClientCallStreamObserver.cancel() by casting the returned StreamObserver to ClientCallStreamObserver or implementing having your passed-in StreamObserver implement ClientResponseObserver.

(强调)

grpc-java 将实现适当的方法,而不是您的实例。所以模式将是:

stub.foo(req, object: ClientResponseObserver<Hallo> {
    override fun beforeStart(respObs: ClientCallStreamObserver<Hallo>) {
        // save respObs for later
    }
    override fun onNext(value: Hallo) {
        streamSuccess(value)
    }
    override fun onError(t: Throwable?) {
        nonSuccess(t?.message ?: "Unknow error")
    }
    override fun onCompleted() {
        Log.d("Info", "completed")
        completed()
    }
});

// -or- (for streaming calls only)

val obs = ...;
val respObs = stub.foo(obs) as (ClientCallStreamObserver<Hallo>);
respObs.onNext(req);
// save respObs for later

请注意,两种情况下的 respObs 是相同的。使用 ClientResponseObserver 主要用于有流式传输并希望在响应观察器中取消以避免线程竞争的情况。