Grpc - 使用 rx observable
Grpc - use rx observable
我正在尝试通过 GRPC 流公开一个可观察对象。
我的简化代码如下所示:
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.ForEachAsync(async value =>
{
await responseStream.WriteAsync(value);
});
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我收到以下错误:
W0123 14:30:59.709715
Grpc.Core.Internal.ServerStreamingServerCallHandler2 Exception
occured in handler. System.ArgumentException: Der Wert liegt außerhalb
des erwarteten Bereichs. bei
Grpc.Core.Internal.ServerStreamingServerCallHandler
2.d__4.MoveNext()
W0123 14:30:59.732716 Grpc.Core.Server Exception while handling RPC.
System.InvalidOperationException: Der Vorgang ist aufgrund des
aktuellen Zustands des Objekts ungültig. bei
Grpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status
status, Metadata trailers, Tuple
2 optionalWrite) bei
Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext()
--- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde --- bei
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task
task) bei
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task
task) bei Grpc.Core.Server.d__34.MoveNext()
您建议如何处理这个问题?我想我需要在同一线程中处理 ForEachAsync。
使用 gRPC 流 API,您一次只能写入一个项目。如果您在前一个操作完成之前启动另一个 WriteAsync() 操作,您将得到一个异常。您还需要在从方法处理程序(在本例中为 Feed 方法)返回之前完成所有写入。一次只允许写入一个的原因是为了确保 gRPC 的流量控制正常工作。
在您的情况下,Rx API 似乎无法确保这一点,因此解决此问题的一种方法是使用中间缓冲区。
这个怎么样?
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.Scan(Task.CompletedTask, (preceding,value) =>
preceding.ContinueWith(_ => responseStream.WriteAsync(value))
);
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我还没有测试过,但我认为它无论如何都可以工作。
有两个问题阻止 OP 的方法按预期工作,这两个问题都源于 responseStream.WriteAsync 一次只能等待一个写入的期望。
- 订阅 observable 后,每个 'onNext' 将在引发通知的同一线程上处理。这意味着您可以强制执行一次写入一个规则。您可以使用 ObserveOn 方法来控制每个通知的安排方式。
var eventLoop = new EventLoopScheduler();
var o = myObservable.ObserveOn(eventLoop);
- ForEachAsync 并不像人们想象的那样使用异步 lambda。如果您将 async/await 放在这里,而是像这样调用 WriteAsync,那么 Write 将在预期的上下文中执行。
responseStream.WriteAsync<Response>(value).Wait();
这是一个完整的例子...
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var o = getMyObservable(request);
try
{
var eventLoop = new EventLoopScheduler(); //just for example, use whatever scheduler makes sense for you.
await o.ObserveOn(eventLoop).ForEachAsync<Response>(value =>
{
responseStream.WriteAsync(value).Wait();
},
context.CancellationToken);
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
Log.Info("Session ended normally");
}
我正在尝试通过 GRPC 流公开一个可观察对象。 我的简化代码如下所示:
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.ForEachAsync(async value =>
{
await responseStream.WriteAsync(value);
});
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我收到以下错误:
W0123 14:30:59.709715 Grpc.Core.Internal.ServerStreamingServerCallHandler
2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler
2.d__4.MoveNext() W0123 14:30:59.732716 Grpc.Core.Server Exception while handling RPC. System.InvalidOperationException: Der Vorgang ist aufgrund des aktuellen Zustands des Objekts ungültig. bei Grpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple
2 optionalWrite) bei Grpc.Core.Internal.ServerStreamingServerCallHandler`2.d__4.MoveNext() --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde --- bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) bei Grpc.Core.Server.d__34.MoveNext()
您建议如何处理这个问题?我想我需要在同一线程中处理 ForEachAsync。
使用 gRPC 流 API,您一次只能写入一个项目。如果您在前一个操作完成之前启动另一个 WriteAsync() 操作,您将得到一个异常。您还需要在从方法处理程序(在本例中为 Feed 方法)返回之前完成所有写入。一次只允许写入一个的原因是为了确保 gRPC 的流量控制正常工作。
在您的情况下,Rx API 似乎无法确保这一点,因此解决此问题的一种方法是使用中间缓冲区。
这个怎么样?
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.Scan(Task.CompletedTask, (preceding,value) =>
preceding.ContinueWith(_ => responseStream.WriteAsync(value))
);
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}
我还没有测试过,但我认为它无论如何都可以工作。
有两个问题阻止 OP 的方法按预期工作,这两个问题都源于 responseStream.WriteAsync 一次只能等待一个写入的期望。
- 订阅 observable 后,每个 'onNext' 将在引发通知的同一线程上处理。这意味着您可以强制执行一次写入一个规则。您可以使用 ObserveOn 方法来控制每个通知的安排方式。
var eventLoop = new EventLoopScheduler();
var o = myObservable.ObserveOn(eventLoop);
- ForEachAsync 并不像人们想象的那样使用异步 lambda。如果您将 async/await 放在这里,而是像这样调用 WriteAsync,那么 Write 将在预期的上下文中执行。
responseStream.WriteAsync<Response>(value).Wait();
这是一个完整的例子...
public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var o = getMyObservable(request);
try
{
var eventLoop = new EventLoopScheduler(); //just for example, use whatever scheduler makes sense for you.
await o.ObserveOn(eventLoop).ForEachAsync<Response>(value =>
{
responseStream.WriteAsync(value).Wait();
},
context.CancellationToken);
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
Log.Info("Session ended normally");
}