通过使同步方法异步进行比线程更多的远程调用

Making more remoting calls than threads by making synchronous methods async

我有一堆都是同步的远程调用(第 3 方库)。它们中的大多数都需要花费很多时间,所以我不能更频繁地使用它们,每秒约 5 到 10 次。这太慢了,因为我需要每两分钟至少调用它们 3000 次,如果服务停止一段时间,还会调用更多次。客户端上几乎没有 CPU 工作。它获取数据,检查一些简单的条件并进行另一个它必须等待的调用。

使它们异步的最佳方法是什么(以异步方式调用它们 - 我想我需要一些异步包装器)以便我可以同时发出更多请求?目前它受线程数(四个)的限制。

我正在考虑用 Task.Run 调用它们,但我阅读的每篇文章都说它用于 CPU 绑定工作并且它使用线程池线程。如果我理解正确,使用这种方法我将无法突破线程限制,对吗?那么哪种方法实际上最适合这里?

Task.FromResult呢?我可以异步等待比线程数量更多的此类方法吗?

public async Task<Data> GetDataTakingLotsOfTime(object id)
{
    var data = remoting.GetData(id);
    return await Task.FromResult(data);
}

I was thinking about calling them with Task.Run but every article I read says it's for CPU bound work and that it uses thread-pool threads.

是的,但是当您陷入同步 API 时,Task.Run() 可能是您的小麻烦,尤其是在客户端上。

您当前的 GetDataTakingLotsOfTime() 版本并不是真正的异步。 FromResult() 仅有助于抑制有关该警告的警告。

What about Task.FromResult? Can I await such methods asynchronously in a greater number than there are threads?

不清楚您的 "number of threads" 想法从何而来,但是是的,启动一个 Task 方法并稍后等待它实际上是在 ThreadPool 上运行它。但是 Task.Run 在这方面更清楚。

请注意,这不依赖于方法的 async 修饰符 - async 是一个实现细节,调用者只关心它 returns 一个任务。

Currently It's limited by the number of threads (which is four).

这需要一些解释。我不明白。

您正在执行远程调用,您的线程需要空闲等待远程调用的结果。在此等待期间,您的线程可以做一些有用的事情,例如执行其他远程调用。

当您的线程空闲地等待其他进程完成时,例如写入磁盘、查询数据库或从 Internet 获取信息,通常情况下您会在非异步函数旁边看到一个异步函数功能:WriteWriteAsyncSendSendAsync

如果在同步调用的最深层您可以访问调用的异步版本,那么您的生活将会很轻松。唉,看来你没有这样的异步版本。

您提出的使用 Task.Run 的解决方案的缺点是启动新线程(或 运行 来自线程池的线程)的开销。

您可以通过创建工作坊对象来降低此开销。在车间里,一个专用线程(一个worker),或者几个专用线程在一个输入点等待命令做某事。线程执行任务并 post 在输出点输出结果。

研讨会的用户有一个访问点(前台?),在那里他们post请求做某事,并等待结果。

为此我使用了 System.Threading.Tasks.Dataflow.BufferBlock。安装 Nuget 包 TPL 数据流。

您可以将您的工作坊专门用于 GetDataTakingLotsOfTime;我使我的车间通用:我接受每一个实现接口 IWork 的工作:

interface IWork
{
    void DoWork();
}

WorkShop 有两个BufferBlocks:一个用于输入工作请求,一个用于输出完成的工作。车间有一个线程(或多个线程)在输入 BufferBlock 处等待,直到作业到达。 Work 完成后 post 将作业输出 BufferBlock

class WorkShop
{
    public WorkShop()
    {
         this.workRequests = new BufferBlock<IWork>();
         this.finishedWork = new BufferBlock<IWork>();
         this.frontOffice = new FrontOffice(this.workRequests, this.finishedWork);
    }

    private readonly BufferBlock<IWork> workRequests;
    private readonly BufferBlock<IWork> finishedWork;
    private readonly FrontOffice frontOffice;

    public FrontOffice {get{return this.frontOffice;} }

    public async Task StartWorkingAsync(CancellationToken token)
    {
        while (await this.workRequests.OutputAvailableAsync(token)
        {   // some work request at the input buffer
            IWork requestedWork = this.workRequests.ReceiveAsync(token);
            requestedWork.DoWork();
            this.FinishedWork.Post(requestedWork);
        }
        // if here: no work expected anymore:
        this.FinishedWork.Complete();
    }

    // function to close the WorkShop
    public async Task CloseShopAsync()
    {
         // signal that no more work is to be expected:
         this.WorkRequests.Complete();
         // await until the worker has finished his last job for the day:
         await this.FinishedWork.Completion();
    }
}

TODO:对 CancellationToken.CancellationRequested
的正确反应 TODO:对工作抛出的异常做出正确反应
TODO:决定是否使用多个线程来完成工作

FrontOffice 有一个异步函数,它接受工作,将工作发送到 WorkRequests 并等待工作完成:

public async Task<IWork> OrderWorkAsync(IWork work, CancellationToken token)
{
    await this.WorkRequests.SendAsync(work, token);
    IWork finishedWork = await this.FinishedWork.ReceivedAsync(token);
    return finishedWork;
}

因此您的进程创建了一个 WorkShop 对象并启动了一个或多个将 StartWorking 的线程。

每当任何线程(包括您的主线程)需要以异步等待方式执行某些工作时:

  • 创建一个保存输入参数和 DoWork 函数的对象
  • 向车间询问 FrontOffice
  • 等待 OrderWorkAsync

.

class InformationGetter : IWork
{
     public int Id {get; set;}                     // the input Id
     public Data FetchedData {get; private set;}   // the result from Remoting.GetData(id);
     public void DoWork()
     {
         this.FetchedData = remoting.GetData(this.Id);
     }
}

最后是你的遥控器的异步版本

async Task<Data> RemoteGetDataAsync(int id)
{
     // create the job to get the information:
     InformationGetter infoGetter = new InformationGetter() {Id = id};

     // go to the front office of the workshop and order to do the job
     await this.MyWorkShop.FrontOffice.OrderWorkAsync(infoGetter);
     return infoGetter.FetchedData;
}