- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我们有一个面向微服务的后端堆栈。所有构建在 Nancy
之上的微服务并使用 topshelf
注册为 Windows 服务.
处理大部分流量(~5000 请求/秒)的服务之一开始在 8 台服务器中的 3 台上出现线程池饥饿问题。
这是我们在到达特定端点时遇到的异常:
System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
at System.Net.HttpWebRequest.BeginGetResponse(AsyncCallback callback, Object state)
at System.Net.Http.HttpClientHandler.StartGettingResponse(RequestState state)
at System.Net.Http.HttpClientHandler.StartRequest(Object obj)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RandomNamedClient.<GetProductBySkuAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductService.<GetBySkuAsync>d__3.MoveNext() in ...\ProductService.cs:line 34
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at ProductModule.<>c__DisplayClass15.<<.ctor>b__b>d__1d.MoveNext() in ...\ProductModule.cs:line 32
此端点调用另一个服务(不在我的团队的域内)以获取产品数据。实现如下:
Get["/product/sku/{sku}", true] = async (parameters, ctx) =>
{
string sku = parameters.sku;
var product = await productService.GetBySkuAsync(sku);
return Response.AsJson(new ProductRepresentation(product));
};
ProductService.GetBySkuAsync(string sku)
实现:
public async Task<Product> GetBySkuAsync(string sku)
{
var productDto = await randomNamedClient.GetProductBySkuAsync(sku);
if (productDto == null)
{
throw new ProductDtoNotFoundException("sku", sku);
}
var variantDto = productDto.VariantList.FirstOrDefault(v => v.Sku == sku);
if (variantDto == null)
{
throw new ProductVariantDtoNotFoundException("sku", sku);
}
return MapVariantDtoToProduct(variantDto, productDto);
}
RandomNamedClient.GetProductBySkuAsync(string sku)
实现(来自内部包):
public async Task<ProductDto> GetProductBySkuAsync(string sku)
{
HttpResponseMessage result = await this._serviceClient.GetAsync("Product?Sku=" + sku);
return result == null || result.StatusCode != HttpStatusCode.OK ? (ProductDto) null : this.Decompress<ProductDto>(result);
}
RandomNamedClient.Decompress<T>(HttpResponseMessage response)
实现:
private T Decompress<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return HttpContentExtensions.ReadAsAsync<T>(response.Content).Result;
using (GZipStream gzipStream = new GZipStream((Stream) new MemoryStream(response.Content.ReadAsByteArrayAsync().Result), CompressionMode.Decompress))
{
byte[] buffer = new byte[8192];
using (MemoryStream memoryStream = new MemoryStream())
{
int count;
do
{
count = gzipStream.Read(buffer, 0, 8192);
if (count > 0)
memoryStream.Write(buffer, 0, count);
}
while (count > 0);
return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
}
我们所有的服务都构建为 Release/32 位。我们没有对线程池的使用做任何调整。
最佳答案
我在这段代码中看到的最大问题是 Decompress<T>
使用 Task.Result
阻止异步操作的方法.这可能会阻止当前正在处理线程池请求的线程的检索,或者更糟糕的是导致代码中的死锁(这正是您 shouldn't block on async code 的原因)。我不确定您是否看到这些请求得到了彻底处理,但如果 NancyFX 正在为您处理同步上下文的编码(看起来像 it does ),这很可能是线程池饥饿的根本原因。
您可以通过在该方法内进行所有 IO 处理来改变它 async
以及,并利用那些类已经公开的自然异步 API。或者,我绝对不建议这样做,您可以使用 ConfigureAwait(false)
无处不在。
(旁注 - 您可以使用 Stream.CopyToAsync()
简化代码)
正确的异步实现应该是这样的:
private async Task<T> DecompressAsync<T>(HttpResponseMessage response)
{
if (!response.Content.Headers.ContentEncoding.Contains("gzip"))
return await response.Content.ReadAsAsync<T>();
const int bufferSize = 8192;
using (GZipStream gzipStream = new GZipStream(
new MemoryStream(
await response.Content.ReadAsByteArrayAsync()),
CompressionMode.Decompress))
using (MemoryStream memoryStream = new MemoryStream())
{
await gzipStream.CopyToAsync(memoryStream, bufferSize);
return JsonConvert.DeserializeObject<T>(
Encoding.UTF8.GetString(memoryStream.ToArray()));
}
}
关于c# - HttpClient GetAsync 线程池饥饿,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35888198/
考虑以下服务器: public class TestServer { public static void main(String[] args) { String ksName = "/so
我正在研究工作队列处理器的设计,其中 QueueProcessor 从队列中检索命令模式对象并在新线程中执行它。 我正在尝试解决嵌套命令可能导致死锁的潜在队列锁定场景。 例如 一个 FooComman
通过使用 UNIX 管道进行进程同步,我们是否会陷入饥饿?例如: void pipesem_wait(struct pipesem *sem) { char onebyte = 'A';
这是使用 Scala 2.8 Actors。我有一个可以并行化的长时间运行的工作。它由大约 650,000 个工作单元组成。我将它分成 2600 个不同的独立子任务,并为每个子任务创建一个新角色: a
回答问题:Task.Yield - real usages?我建议使用 Task.Yield 允许池线程被其他任务重用。在这样的模式中: CancellationTokenSource cts;
我是一名优秀的程序员,十分优秀!