GRPC 异步响应流 C#
GRPC async response stream C#
我如何从处理程序外部为 RPC 生成流式响应值? (具体来说,来自 IObservable)我目前正在执行以下操作,但这会产生跨线程问题,因为 AnRxObservable
在 RPC 处理程序之间共享...
public override Task GetTicker(RequestProto request, ServerCallContext context)
{
var subscription = AnRxObservable.Subscribe(value =>
{
responseStream.WriteAsync(new ResponseProto
{
Value = value
});
});
// Wait for the RPC to be canceled (my extension method
// that returns a task that completes when the CancellationToken
// is cancelled)
await context.CancellationToken.WhenCancelled();
// Dispose of the buffered stream
bufferedStream.Dispose();
// Dispose subscriber (tells rx that we aren't subscribed anymore)
subscription.Dispose();
return Task.FromResult(1);
}
这段代码感觉不对...但我看不到从 RPC 处理程序外部创建的共享源流式传输 RPC 响应的任何其他方式。
一般来说,当您尝试从推模型 (IObservable) 转换为拉模型(枚举响应以写入和写入它们)时,您需要一个消息的中间缓冲区 - 例如阻塞队列。然后,处理程序主体可以是一个异步循环,它尝试为队列获取下一条消息(最好以异步方式)并将其写入 responseStream。
另外,请注意 gRPC API 只允许您在任何给定时间有 1 个飞行中响应 - 而您的代码段不遵守这一点。因此,您需要在开始另一次写入之前等待 WriteAsync()(这是您需要中间队列的另一个原因)。
这个 link 可能有助于解释推与拉模式:When to use IEnumerable vs IObservable?
我如何从处理程序外部为 RPC 生成流式响应值? (具体来说,来自 IObservable)我目前正在执行以下操作,但这会产生跨线程问题,因为 AnRxObservable
在 RPC 处理程序之间共享...
public override Task GetTicker(RequestProto request, ServerCallContext context)
{
var subscription = AnRxObservable.Subscribe(value =>
{
responseStream.WriteAsync(new ResponseProto
{
Value = value
});
});
// Wait for the RPC to be canceled (my extension method
// that returns a task that completes when the CancellationToken
// is cancelled)
await context.CancellationToken.WhenCancelled();
// Dispose of the buffered stream
bufferedStream.Dispose();
// Dispose subscriber (tells rx that we aren't subscribed anymore)
subscription.Dispose();
return Task.FromResult(1);
}
这段代码感觉不对...但我看不到从 RPC 处理程序外部创建的共享源流式传输 RPC 响应的任何其他方式。
一般来说,当您尝试从推模型 (IObservable) 转换为拉模型(枚举响应以写入和写入它们)时,您需要一个消息的中间缓冲区 - 例如阻塞队列。然后,处理程序主体可以是一个异步循环,它尝试为队列获取下一条消息(最好以异步方式)并将其写入 responseStream。
另外,请注意 gRPC API 只允许您在任何给定时间有 1 个飞行中响应 - 而您的代码段不遵守这一点。因此,您需要在开始另一次写入之前等待 WriteAsync()(这是您需要中间队列的另一个原因)。
这个 link 可能有助于解释推与拉模式:When to use IEnumerable vs IObservable?