通过使同步方法异步进行比线程更多的远程调用
Making more remoting calls than threads by making synchronous methods async
我有一堆都是同步的远程调用(第 3 方库)。它们中的大多数都需要花费很多时间,所以我不能更频繁地使用它们,每秒约 5 到 10 次。这太慢了,因为我需要每两分钟至少调用它们 3000 次,如果服务停止一段时间,还会调用更多次。客户端上几乎没有 CPU 工作。它获取数据,检查一些简单的条件并进行另一个它必须等待的调用。
使它们异步的最佳方法是什么(以异步方式调用它们 - 我想我需要一些异步包装器)以便我可以同时发出更多请求?目前它受线程数(四个)的限制。
我正在考虑用 Task.Run
调用它们,但我阅读的每篇文章都说它用于 CPU 绑定工作并且它使用线程池线程。如果我理解正确,使用这种方法我将无法突破线程限制,对吗?那么哪种方法实际上最适合这里?
那Task.FromResult
呢?我可以异步等待比线程数量更多的此类方法吗?
public async Task<Data> GetDataTakingLotsOfTime(object id)
{
var data = remoting.GetData(id);
return await Task.FromResult(data);
}
I was thinking about calling them with Task.Run but every article I read says it's for CPU bound work and that it uses thread-pool threads.
是的,但是当您陷入同步 API 时,Task.Run()
可能是您的小麻烦,尤其是在客户端上。
您当前的 GetDataTakingLotsOfTime() 版本并不是真正的异步。 FromResult() 仅有助于抑制有关该警告的警告。
What about Task.FromResult? Can I await such methods asynchronously in a greater number than there are threads?
不清楚您的 "number of threads" 想法从何而来,但是是的,启动一个 Task 方法并稍后等待它实际上是在 ThreadPool 上运行它。但是 Task.Run 在这方面更清楚。
请注意,这不依赖于方法的 async
修饰符 - async 是一个实现细节,调用者只关心它 returns 一个任务。
Currently It's limited by the number of threads (which is four).
这需要一些解释。我不明白。
您正在执行远程调用,您的线程需要空闲等待远程调用的结果。在此等待期间,您的线程可以做一些有用的事情,例如执行其他远程调用。
当您的线程空闲地等待其他进程完成时,例如写入磁盘、查询数据库或从 Internet 获取信息,通常情况下您会在非异步函数旁边看到一个异步函数功能:Write
和 WriteAsync
、Send
和 SendAsync
。
如果在同步调用的最深层您可以访问调用的异步版本,那么您的生活将会很轻松。唉,看来你没有这样的异步版本。
您提出的使用 Task.Run
的解决方案的缺点是启动新线程(或 运行 来自线程池的线程)的开销。
您可以通过创建工作坊对象来降低此开销。在车间里,一个专用线程(一个worker),或者几个专用线程在一个输入点等待命令做某事。线程执行任务并 post 在输出点输出结果。
研讨会的用户有一个访问点(前台?),在那里他们post请求做某事,并等待结果。
为此我使用了 System.Threading.Tasks.Dataflow.BufferBlock。安装 Nuget 包 TPL 数据流。
您可以将您的工作坊专门用于 GetDataTakingLotsOfTime
;我使我的车间通用:我接受每一个实现接口 IWork 的工作:
interface IWork
{
void DoWork();
}
WorkShop 有两个BufferBlocks
:一个用于输入工作请求,一个用于输出完成的工作。车间有一个线程(或多个线程)在输入 BufferBlock
处等待,直到作业到达。 Work
完成后 post 将作业输出 BufferBlock
class WorkShop
{
public WorkShop()
{
this.workRequests = new BufferBlock<IWork>();
this.finishedWork = new BufferBlock<IWork>();
this.frontOffice = new FrontOffice(this.workRequests, this.finishedWork);
}
private readonly BufferBlock<IWork> workRequests;
private readonly BufferBlock<IWork> finishedWork;
private readonly FrontOffice frontOffice;
public FrontOffice {get{return this.frontOffice;} }
public async Task StartWorkingAsync(CancellationToken token)
{
while (await this.workRequests.OutputAvailableAsync(token)
{ // some work request at the input buffer
IWork requestedWork = this.workRequests.ReceiveAsync(token);
requestedWork.DoWork();
this.FinishedWork.Post(requestedWork);
}
// if here: no work expected anymore:
this.FinishedWork.Complete();
}
// function to close the WorkShop
public async Task CloseShopAsync()
{
// signal that no more work is to be expected:
this.WorkRequests.Complete();
// await until the worker has finished his last job for the day:
await this.FinishedWork.Completion();
}
}
TODO:对 CancellationToken.CancellationRequested
的正确反应
TODO:对工作抛出的异常做出正确反应
TODO:决定是否使用多个线程来完成工作
FrontOffice 有一个异步函数,它接受工作,将工作发送到 WorkRequests 并等待工作完成:
public async Task<IWork> OrderWorkAsync(IWork work, CancellationToken token)
{
await this.WorkRequests.SendAsync(work, token);
IWork finishedWork = await this.FinishedWork.ReceivedAsync(token);
return finishedWork;
}
因此您的进程创建了一个 WorkShop 对象并启动了一个或多个将 StartWorking 的线程。
每当任何线程(包括您的主线程)需要以异步等待方式执行某些工作时:
- 创建一个保存输入参数和 DoWork 函数的对象
- 向车间询问 FrontOffice
- 等待 OrderWorkAsync
.
class InformationGetter : IWork
{
public int Id {get; set;} // the input Id
public Data FetchedData {get; private set;} // the result from Remoting.GetData(id);
public void DoWork()
{
this.FetchedData = remoting.GetData(this.Id);
}
}
最后是你的遥控器的异步版本
async Task<Data> RemoteGetDataAsync(int id)
{
// create the job to get the information:
InformationGetter infoGetter = new InformationGetter() {Id = id};
// go to the front office of the workshop and order to do the job
await this.MyWorkShop.FrontOffice.OrderWorkAsync(infoGetter);
return infoGetter.FetchedData;
}
我有一堆都是同步的远程调用(第 3 方库)。它们中的大多数都需要花费很多时间,所以我不能更频繁地使用它们,每秒约 5 到 10 次。这太慢了,因为我需要每两分钟至少调用它们 3000 次,如果服务停止一段时间,还会调用更多次。客户端上几乎没有 CPU 工作。它获取数据,检查一些简单的条件并进行另一个它必须等待的调用。
使它们异步的最佳方法是什么(以异步方式调用它们 - 我想我需要一些异步包装器)以便我可以同时发出更多请求?目前它受线程数(四个)的限制。
我正在考虑用 Task.Run
调用它们,但我阅读的每篇文章都说它用于 CPU 绑定工作并且它使用线程池线程。如果我理解正确,使用这种方法我将无法突破线程限制,对吗?那么哪种方法实际上最适合这里?
那Task.FromResult
呢?我可以异步等待比线程数量更多的此类方法吗?
public async Task<Data> GetDataTakingLotsOfTime(object id)
{
var data = remoting.GetData(id);
return await Task.FromResult(data);
}
I was thinking about calling them with Task.Run but every article I read says it's for CPU bound work and that it uses thread-pool threads.
是的,但是当您陷入同步 API 时,Task.Run()
可能是您的小麻烦,尤其是在客户端上。
您当前的 GetDataTakingLotsOfTime() 版本并不是真正的异步。 FromResult() 仅有助于抑制有关该警告的警告。
What about Task.FromResult? Can I await such methods asynchronously in a greater number than there are threads?
不清楚您的 "number of threads" 想法从何而来,但是是的,启动一个 Task 方法并稍后等待它实际上是在 ThreadPool 上运行它。但是 Task.Run 在这方面更清楚。
请注意,这不依赖于方法的 async
修饰符 - async 是一个实现细节,调用者只关心它 returns 一个任务。
Currently It's limited by the number of threads (which is four).
这需要一些解释。我不明白。
您正在执行远程调用,您的线程需要空闲等待远程调用的结果。在此等待期间,您的线程可以做一些有用的事情,例如执行其他远程调用。
当您的线程空闲地等待其他进程完成时,例如写入磁盘、查询数据库或从 Internet 获取信息,通常情况下您会在非异步函数旁边看到一个异步函数功能:Write
和 WriteAsync
、Send
和 SendAsync
。
如果在同步调用的最深层您可以访问调用的异步版本,那么您的生活将会很轻松。唉,看来你没有这样的异步版本。
您提出的使用 Task.Run
的解决方案的缺点是启动新线程(或 运行 来自线程池的线程)的开销。
您可以通过创建工作坊对象来降低此开销。在车间里,一个专用线程(一个worker),或者几个专用线程在一个输入点等待命令做某事。线程执行任务并 post 在输出点输出结果。
研讨会的用户有一个访问点(前台?),在那里他们post请求做某事,并等待结果。
为此我使用了 System.Threading.Tasks.Dataflow.BufferBlock。安装 Nuget 包 TPL 数据流。
您可以将您的工作坊专门用于 GetDataTakingLotsOfTime
;我使我的车间通用:我接受每一个实现接口 IWork 的工作:
interface IWork
{
void DoWork();
}
WorkShop 有两个BufferBlocks
:一个用于输入工作请求,一个用于输出完成的工作。车间有一个线程(或多个线程)在输入 BufferBlock
处等待,直到作业到达。 Work
完成后 post 将作业输出 BufferBlock
class WorkShop
{
public WorkShop()
{
this.workRequests = new BufferBlock<IWork>();
this.finishedWork = new BufferBlock<IWork>();
this.frontOffice = new FrontOffice(this.workRequests, this.finishedWork);
}
private readonly BufferBlock<IWork> workRequests;
private readonly BufferBlock<IWork> finishedWork;
private readonly FrontOffice frontOffice;
public FrontOffice {get{return this.frontOffice;} }
public async Task StartWorkingAsync(CancellationToken token)
{
while (await this.workRequests.OutputAvailableAsync(token)
{ // some work request at the input buffer
IWork requestedWork = this.workRequests.ReceiveAsync(token);
requestedWork.DoWork();
this.FinishedWork.Post(requestedWork);
}
// if here: no work expected anymore:
this.FinishedWork.Complete();
}
// function to close the WorkShop
public async Task CloseShopAsync()
{
// signal that no more work is to be expected:
this.WorkRequests.Complete();
// await until the worker has finished his last job for the day:
await this.FinishedWork.Completion();
}
}
TODO:对 CancellationToken.CancellationRequested
的正确反应
TODO:对工作抛出的异常做出正确反应
TODO:决定是否使用多个线程来完成工作
FrontOffice 有一个异步函数,它接受工作,将工作发送到 WorkRequests 并等待工作完成:
public async Task<IWork> OrderWorkAsync(IWork work, CancellationToken token)
{
await this.WorkRequests.SendAsync(work, token);
IWork finishedWork = await this.FinishedWork.ReceivedAsync(token);
return finishedWork;
}
因此您的进程创建了一个 WorkShop 对象并启动了一个或多个将 StartWorking 的线程。
每当任何线程(包括您的主线程)需要以异步等待方式执行某些工作时:
- 创建一个保存输入参数和 DoWork 函数的对象
- 向车间询问 FrontOffice
- 等待 OrderWorkAsync
.
class InformationGetter : IWork
{
public int Id {get; set;} // the input Id
public Data FetchedData {get; private set;} // the result from Remoting.GetData(id);
public void DoWork()
{
this.FetchedData = remoting.GetData(this.Id);
}
}
最后是你的遥控器的异步版本
async Task<Data> RemoteGetDataAsync(int id)
{
// create the job to get the information:
InformationGetter infoGetter = new InformationGetter() {Id = id};
// go to the front office of the workshop and order to do the job
await this.MyWorkShop.FrontOffice.OrderWorkAsync(infoGetter);
return infoGetter.FetchedData;
}