可重试的 grpc-web server-streaming rpc
Retryable grpc-web server-streaming rpc
我正在尝试用 rxjs.Observable 包装一个 grpc-web 服务器流客户端,如果说服务器 returns 出错,我能够执行重试。
考虑以下代码。
// server
foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
if (!call.request?.getMessage()) {
call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
}
for (let i = 0; i <= 2; i++) {
call.write(new FooResponse())
}
call.end()
}
// client
test("should not end on retry", (done) => {
new Observable(obs => {
const call = new FooClient("http://localhost:8080").foo(new FooRequest())
call.on("data", data => obs.next(data))
call.on("error", err => {
console.log("server emitted error")
obs.error(err)
})
call.on("end", () => {
console.log("server emitted end")
obs.complete()
})
})
.pipe(retryWhen(<custom retry policy>))
.subscribe(
_resp => () => {},
_error => {
console.log("source observable error")
done()
},
() => {
console.log("source observable completed(?)")
done()
})
})
// output
server emitted error
server emitted end
source observable completed(?)
服务器在(?)发出“错误”之后发出“结束”事件,所以我似乎必须从源可观察对象中删除“结束”处理程序。
end/complete 流的“Rx-y”方式是什么?
对于任何感兴趣的人,我最终删除了“结束”事件处理程序并将其替换为“状态”,如果服务器 returns 一个 OK
状态代码(表示结束stream) 那么 observable 就完成了。
new Observable(obs => {
const call = new FooClient("http://localhost:8080").foo(new FooRequest())
call.on("data", data => obs.next(data))
call.on("error", err => obs.error(err))
call.on("status", status: grpcWeb.Status => {
if (status.code == grpcWeb.StatusCode.OK) {
return observer.complete()
}
})
})
我正在尝试用 rxjs.Observable 包装一个 grpc-web 服务器流客户端,如果说服务器 returns 出错,我能够执行重试。
考虑以下代码。
// server
foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
if (!call.request?.getMessage()) {
call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
}
for (let i = 0; i <= 2; i++) {
call.write(new FooResponse())
}
call.end()
}
// client
test("should not end on retry", (done) => {
new Observable(obs => {
const call = new FooClient("http://localhost:8080").foo(new FooRequest())
call.on("data", data => obs.next(data))
call.on("error", err => {
console.log("server emitted error")
obs.error(err)
})
call.on("end", () => {
console.log("server emitted end")
obs.complete()
})
})
.pipe(retryWhen(<custom retry policy>))
.subscribe(
_resp => () => {},
_error => {
console.log("source observable error")
done()
},
() => {
console.log("source observable completed(?)")
done()
})
})
// output
server emitted error
server emitted end
source observable completed(?)
服务器在(?)发出“错误”之后发出“结束”事件,所以我似乎必须从源可观察对象中删除“结束”处理程序。
end/complete 流的“Rx-y”方式是什么?
对于任何感兴趣的人,我最终删除了“结束”事件处理程序并将其替换为“状态”,如果服务器 returns 一个 OK
状态代码(表示结束stream) 那么 observable 就完成了。
new Observable(obs => {
const call = new FooClient("http://localhost:8080").foo(new FooRequest())
call.on("data", data => obs.next(data))
call.on("error", err => obs.error(err))
call.on("status", status: grpcWeb.Status => {
if (status.code == grpcWeb.StatusCode.OK) {
return observer.complete()
}
})
})