gRPC 间歇性地具有高延迟
gRPC intermittently has high delays
我有一个公开 gRPC 双向端点的服务器应用程序(C# 和 .Net 5)。此端点采用二进制流,服务器在其中分析并生成发送回 gRPC 响应流的响应。
通过 gRPC 发送的每个文件都是几兆字节,gRPC 调用需要几分钟才能完成流式传输(无延迟)。由于延迟,这个时间有时会增加 50%。
在客户端,我有 2 个任务 (Task.Run
) 运行,一个使用 FileStream
从客户端的文件系统流式传输文件,另一个读取来自服务器的响应 ( gRPC).
同样在服务器上,我有 2 个任务 运行,一个从 gRPC 请求流中读取消息并将它们推入队列 (DataFlow.BufferBlock<byte[]>
),另一个处理队列中的消息,以及写对 gRPC 的响应。
问题:
如果我禁用(注释掉)所有服务器处理代码,并简单地从 gRPC 读取和记录消息,那么从客户端到服务器的延迟几乎为 0。
当服务器启用处理时,客户端在写入 grpcClient 时会出现延迟。
只有 10 个活动的并行会话(gRPC 调用),这些延迟可能会达到 10-15 秒。
PS: 只有当我有多个客户端时才会发生这种情况运行,并发客户端数量越多意味着延迟越长。
客户端代码如下所示:
FileStream fs = new(audioFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 1024 * 1024, true);
byte[] buffer = new byte[10_000];
GrpcClient client = new GrpcClient(_singletonChannel); // using single channel since only 5-10 clients are there right now
BiDiCall call = client.BiDiService(hheaders: null, deadline: null, CancellationToken.None);
var writeTask = Task.Run(async () => {
while (fs.ReadAsync(buffer, 0, buffer.Length))
{
call.RequestStream.WriteAsync(new() { Chunk = ByteString.CopyFrom(buffer) });
}
await call.RequestStream.CompleteAsync();
});
var readTask = Task.Run(async () => {
while (await call.ResponseStream.MoveNext())
{
// write to log call.ResponseStream.Current
}
});
await Task.WhenAll(writeTask, readTask);
await call;
服务器代码如下:
readonly BufferBlock<MessageRequest> messages = new();
MessageProcessor _processor = new();
public override async Task BiDiService(IAsyncStreamReader<MessageRequest> requestStream,
IServerStreamWriter<MessageResponse> responseStream,
ServerCallContext context)
{
var readTask = TaskFactory.StartNew(() => {
while (await requestStream.MoveNext())
{
messages.Post(requestStream.Current); // add to queue
}
messages.Complete();
}, TaskCreationOptions.LongRunning).ConfigureAwait(false);
var processTask = Task.Run(() => {
while (await messages.OutputAvailableAsync())
{
var message = await messages.ReceiveAsync(); // pick from queue
// if I comment out below line and run with multiple clients = latency disappears
var result = await _processor.Process(message); // takes some time to process
if (result.IsImportantForClient())
await responseStrem.WriteAsync(result.Value);
}
});
await Task.WhenAll(readTask, processTask);
}
对 SO 的初始问题有许多有希望的评论,但我想解释一下我认为重要的内容:
- 一个调用 2
的外部异步方法
- Task.Run() 的 - 带有包装异步循环的 TaskCreationOptions.LongRunning 选项,最后是
- returns a Task.WhenAll() 重新加入两个任务...
Alois Kraus 提出 OS 任务调度器是一个 OS 并且它的调度可以抽象出您可能认为更有效的东西 - 这很可能是真的,如果它是
我会建议您尝试删除异步处理,看看您可能会看到各种 sync/async 混合的好处差异可能更适合您的特定场景。
要确保记住的一件事是 asynce/await 在逻辑上以自动线程管理为代价阻塞 - 这对于单路径 I/O 绑定处理非常有用(例如需要调用 db/webservice 在继续下一步执行之前)并且随着您转向计算绑定处理(需要明确重新加入的执行 - async/await 隐式处理任务重新加入)
因此,事实证明,问题是由于 ThreadPool
.
产生的工作线程数量延迟造成的
ThreadPool
花费更多时间来生成线程来处理这些任务,导致 gRPC 读取有明显的滞后。
使用 ThreadPool.SetMinThreads
增加生成请求的 minThread
计数后已修复此问题。 MSDN reference
我有一个公开 gRPC 双向端点的服务器应用程序(C# 和 .Net 5)。此端点采用二进制流,服务器在其中分析并生成发送回 gRPC 响应流的响应。
通过 gRPC 发送的每个文件都是几兆字节,gRPC 调用需要几分钟才能完成流式传输(无延迟)。由于延迟,这个时间有时会增加 50%。
在客户端,我有 2 个任务 (Task.Run
) 运行,一个使用 FileStream
从客户端的文件系统流式传输文件,另一个读取来自服务器的响应 ( gRPC).
同样在服务器上,我有 2 个任务 运行,一个从 gRPC 请求流中读取消息并将它们推入队列 (DataFlow.BufferBlock<byte[]>
),另一个处理队列中的消息,以及写对 gRPC 的响应。
问题:
如果我禁用(注释掉)所有服务器处理代码,并简单地从 gRPC 读取和记录消息,那么从客户端到服务器的延迟几乎为 0。
当服务器启用处理时,客户端在写入 grpcClient 时会出现延迟。
只有 10 个活动的并行会话(gRPC 调用),这些延迟可能会达到 10-15 秒。
PS: 只有当我有多个客户端时才会发生这种情况运行,并发客户端数量越多意味着延迟越长。
客户端代码如下所示:
FileStream fs = new(audioFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 1024 * 1024, true);
byte[] buffer = new byte[10_000];
GrpcClient client = new GrpcClient(_singletonChannel); // using single channel since only 5-10 clients are there right now
BiDiCall call = client.BiDiService(hheaders: null, deadline: null, CancellationToken.None);
var writeTask = Task.Run(async () => {
while (fs.ReadAsync(buffer, 0, buffer.Length))
{
call.RequestStream.WriteAsync(new() { Chunk = ByteString.CopyFrom(buffer) });
}
await call.RequestStream.CompleteAsync();
});
var readTask = Task.Run(async () => {
while (await call.ResponseStream.MoveNext())
{
// write to log call.ResponseStream.Current
}
});
await Task.WhenAll(writeTask, readTask);
await call;
服务器代码如下:
readonly BufferBlock<MessageRequest> messages = new();
MessageProcessor _processor = new();
public override async Task BiDiService(IAsyncStreamReader<MessageRequest> requestStream,
IServerStreamWriter<MessageResponse> responseStream,
ServerCallContext context)
{
var readTask = TaskFactory.StartNew(() => {
while (await requestStream.MoveNext())
{
messages.Post(requestStream.Current); // add to queue
}
messages.Complete();
}, TaskCreationOptions.LongRunning).ConfigureAwait(false);
var processTask = Task.Run(() => {
while (await messages.OutputAvailableAsync())
{
var message = await messages.ReceiveAsync(); // pick from queue
// if I comment out below line and run with multiple clients = latency disappears
var result = await _processor.Process(message); // takes some time to process
if (result.IsImportantForClient())
await responseStrem.WriteAsync(result.Value);
}
});
await Task.WhenAll(readTask, processTask);
}
对 SO 的初始问题有许多有希望的评论,但我想解释一下我认为重要的内容:
- 一个调用 2 的外部异步方法
- Task.Run() 的 - 带有包装异步循环的 TaskCreationOptions.LongRunning 选项,最后是
- returns a Task.WhenAll() 重新加入两个任务... Alois Kraus 提出 OS 任务调度器是一个 OS 并且它的调度可以抽象出您可能认为更有效的东西 - 这很可能是真的,如果它是
我会建议您尝试删除异步处理,看看您可能会看到各种 sync/async 混合的好处差异可能更适合您的特定场景。 要确保记住的一件事是 asynce/await 在逻辑上以自动线程管理为代价阻塞 - 这对于单路径 I/O 绑定处理非常有用(例如需要调用 db/webservice 在继续下一步执行之前)并且随着您转向计算绑定处理(需要明确重新加入的执行 - async/await 隐式处理任务重新加入)
因此,事实证明,问题是由于 ThreadPool
.
ThreadPool
花费更多时间来生成线程来处理这些任务,导致 gRPC 读取有明显的滞后。
使用 ThreadPool.SetMinThreads
增加生成请求的 minThread
计数后已修复此问题。 MSDN reference