可重试的 grpc-web server-streaming rpc

Retryable grpc-web server-streaming rpc

我正在尝试用 rxjs.Observable 包装一个 grpc-web 服务器流客户端,如果说服务器 returns 出错,我能够执行重试。

考虑以下代码。

  // server
  foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
    if (!call.request?.getMessage()) {
      call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
    }

    for (let i = 0; i <= 2; i++) {
      call.write(new FooResponse())
    }
    call.end()
  }
 
  // client 
  test("should not end on retry", (done) => {
    new Observable(obs => {
      const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
      call.on("data", data => obs.next(data))
      call.on("error", err => {
        console.log("server emitted error")
        obs.error(err)
      })
      call.on("end", () => {
        console.log("server emitted end")
        obs.complete()
      })
    })
    .pipe(retryWhen(<custom retry policy>))
    .subscribe(
        _resp => () => {},
        _error => {
          console.log("source observable error")
          done()
        },
        () => {
          console.log("source observable completed(?)")
          done()
        })
  })
  
  // output
  server emitted error
  server emitted end
  source observable completed(?)

服务器在(?)发出“错误”之后发出“结束”事件,所以我似乎必须从源可观察对象中删除“结束”处理程序。

end/complete 流的“Rx​​-y”方式是什么?

对于任何感兴趣的人,我最终删除了“结束”事件处理程序并将其替换为“状态”,如果服务器 returns 一个 OK 状态代码(表示结束stream) 那么 observable 就完成了。

new Observable(obs => {
  const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
  call.on("data", data => obs.next(data))
  call.on("error", err => obs.error(err))
  call.on("status", status: grpcWeb.Status => {
    if (status.code == grpcWeb.StatusCode.OK) {
      return observer.complete()
    }
  })
})