HttpClient GetAsync 线程池饥饿
HttpClient GetAsync ThreadPool Starvation
我们有一个面向微服务的后端堆栈。所有微服务都构建在 Nancy
之上并注册为 windows 服务 topshelf
.
处理大部分流量 (~5000 req/s) 的服务之一开始在 8 台服务器中的 3 台上出现线程池饥饿问题。
这是我们在到达特定端点时遇到的异常:
System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetResponse(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingResponse(RequestState state)
at System.Net.Http.HttpClientHandler.StartRequest(Object obj)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RandomNamedClient.<GetProductBySkuAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductService.<GetBySkuAsync>d__3.MoveNext() in ...\ProductService.cs:line 34
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductModule.<>c__DisplayClass15.<<.ctor>b__b>d__1d.MoveNext() in ...\ProductModule.cs:line 32
此端点调用另一个服务(不在我团队的域内)以获取产品数据。实现如下:
Get["/product/sku/{sku}", true] = async (parameters, ctx) =>
{
string sku = parameters.sku;
var product = await productService.GetBySkuAsync(sku);
return Response.AsJson(new ProductRepresentation(product));
};
ProductService.GetBySkuAsync(string sku)
实施:
public async Task<Product> GetBySkuAsync(string sku)
{
var productDto = await randomNamedClient.GetProductBySkuAsync(sku);
if (productDto == null)
{
throw new ProductDtoNotFoundException("sku", sku);
}
var variantDto = productDto.VariantList.FirstOrDefault(v => v.Sku == sku);
if (variantDto == null)
{
throw new ProductVariantDtoNotFoundException("sku", sku);
}
return MapVariantDtoToProduct(variantDto, productDto);
}
RandomNamedClient.GetProductBySkuAsync(string sku)
实现(来自内部包):
public async Task<ProductDto> GetProductBySkuAsync(string sku)
{
HttpResponseMessage result = await this._serviceClient.GetAsync("Product?Sku=" + sku);
return result == null || result.StatusCode != HttpStatusCode.OK ? (ProductDto) null : this.Decompress<ProductDto>(result);
}
RandomNamedClient.Decompress<T>(HttpResponseMessage response)
实施:
private T Decompress<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return HttpContentExtensions.ReadAsAsync<T>(response.Content).Result;
using (GZipStream gzipStream = new GZipStream((Stream) new MemoryStream(response.Content.ReadAsByteArrayAsync().Result), CompressionMode.Decompress))
{
byte[] buffer = new byte[8192];
using (MemoryStream memoryStream = new MemoryStream())
{
int count;
do
{
count = gzipStream.Read(buffer, 0, 8192);
if (count > 0)
memoryStream.Write(buffer, 0, count);
}
while (count > 0);
return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
}
我们所有的服务都构建为 Release/32-bit。我们没有对线程池的使用做任何调整。
我在这段代码中看到的最大问题是 Decompress<T>
方法,它使用 Task.Result
阻塞异步操作。这可能会阻止当前正在处理对线程池的请求的线程的检索,或者更糟的是导致代码 死锁 (这正是你 shouldn't block on async code). I'm not sure if you've seen those requests get processed thoroughly, but if NancyFX is handling marshaling of the synchronization context for you (which seems like it does 的原因)很可能是线程池饥饿的根本原因。
您也可以通过使所有 IO 处理都在该方法内工作来改变这一点 async
,并利用那些 类 已经公开的自然异步 API。或者,我绝对不建议这样做,你可以在任何地方使用 ConfigureAwait(false)
。
(旁注 - 您可以使用 Stream.CopyToAsync()
)
来简化代码
正确的异步实现如下所示:
private async Task<T> DecompressAsync<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return await response.Content.ReadAsAsync<T>();
const int bufferSize = 8192;
using (GZipStream gzipStream = new GZipStream(
new MemoryStream(
await response.Content.ReadAsByteArrayAsync()),
CompressionMode.Decompress))
using (MemoryStream memoryStream = new MemoryStream())
{
await gzipStream.CopyToAsync(memoryStream, bufferSize);
return JsonConvert.DeserializeObject<T>(
Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
我们有一个面向微服务的后端堆栈。所有微服务都构建在 Nancy
之上并注册为 windows 服务 topshelf
.
处理大部分流量 (~5000 req/s) 的服务之一开始在 8 台服务器中的 3 台上出现线程池饥饿问题。
这是我们在到达特定端点时遇到的异常:
System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetResponse(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingResponse(RequestState state)
at System.Net.Http.HttpClientHandler.StartRequest(Object obj)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RandomNamedClient.<GetProductBySkuAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductService.<GetBySkuAsync>d__3.MoveNext() in ...\ProductService.cs:line 34
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductModule.<>c__DisplayClass15.<<.ctor>b__b>d__1d.MoveNext() in ...\ProductModule.cs:line 32
此端点调用另一个服务(不在我团队的域内)以获取产品数据。实现如下:
Get["/product/sku/{sku}", true] = async (parameters, ctx) =>
{
string sku = parameters.sku;
var product = await productService.GetBySkuAsync(sku);
return Response.AsJson(new ProductRepresentation(product));
};
ProductService.GetBySkuAsync(string sku)
实施:
public async Task<Product> GetBySkuAsync(string sku)
{
var productDto = await randomNamedClient.GetProductBySkuAsync(sku);
if (productDto == null)
{
throw new ProductDtoNotFoundException("sku", sku);
}
var variantDto = productDto.VariantList.FirstOrDefault(v => v.Sku == sku);
if (variantDto == null)
{
throw new ProductVariantDtoNotFoundException("sku", sku);
}
return MapVariantDtoToProduct(variantDto, productDto);
}
RandomNamedClient.GetProductBySkuAsync(string sku)
实现(来自内部包):
public async Task<ProductDto> GetProductBySkuAsync(string sku)
{
HttpResponseMessage result = await this._serviceClient.GetAsync("Product?Sku=" + sku);
return result == null || result.StatusCode != HttpStatusCode.OK ? (ProductDto) null : this.Decompress<ProductDto>(result);
}
RandomNamedClient.Decompress<T>(HttpResponseMessage response)
实施:
private T Decompress<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return HttpContentExtensions.ReadAsAsync<T>(response.Content).Result;
using (GZipStream gzipStream = new GZipStream((Stream) new MemoryStream(response.Content.ReadAsByteArrayAsync().Result), CompressionMode.Decompress))
{
byte[] buffer = new byte[8192];
using (MemoryStream memoryStream = new MemoryStream())
{
int count;
do
{
count = gzipStream.Read(buffer, 0, 8192);
if (count > 0)
memoryStream.Write(buffer, 0, count);
}
while (count > 0);
return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
}
我们所有的服务都构建为 Release/32-bit。我们没有对线程池的使用做任何调整。
我在这段代码中看到的最大问题是 Decompress<T>
方法,它使用 Task.Result
阻塞异步操作。这可能会阻止当前正在处理对线程池的请求的线程的检索,或者更糟的是导致代码 死锁 (这正是你 shouldn't block on async code). I'm not sure if you've seen those requests get processed thoroughly, but if NancyFX is handling marshaling of the synchronization context for you (which seems like it does 的原因)很可能是线程池饥饿的根本原因。
您也可以通过使所有 IO 处理都在该方法内工作来改变这一点 async
,并利用那些 类 已经公开的自然异步 API。或者,我绝对不建议这样做,你可以在任何地方使用 ConfigureAwait(false)
。
(旁注 - 您可以使用 Stream.CopyToAsync()
)
正确的异步实现如下所示:
private async Task<T> DecompressAsync<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return await response.Content.ReadAsAsync<T>();
const int bufferSize = 8192;
using (GZipStream gzipStream = new GZipStream(
new MemoryStream(
await response.Content.ReadAsByteArrayAsync()),
CompressionMode.Decompress))
using (MemoryStream memoryStream = new MemoryStream())
{
await gzipStream.CopyToAsync(memoryStream, bufferSize);
return JsonConvert.DeserializeObject<T>(
Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}