限制并发异步任务

Limiting concurrent async tasks

我想使用 SSH.NET 库和 Renci.SshNet.Async 扩展将大批量(可能是 100 多个)文件上传到 FTP。我需要将并发上传的数量限制为五个,或者我发现 FTP 可以处理的任何数量。

这是我在任何限制之前的代码:

using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
    var tasks = new List<Task>();
    try
    {
        sftp.Connect();

        foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
        {
            tasks.Add(
                sftp.UploadAsync(
                    File.OpenRead(file),      // Stream input
                    Path.GetFileName(file),   // string path
                    true));                   // bool canOverride
        }

        await Task.WhenAll(tasks);
        sftp.Disconnect();
    }
    // trimmed catch
}

我读过 SemaphoreSlim,但我不完全理解它的工作原理以及它如何与 TAP 一起使用。这是基于 MSDN documentation,我将如何实现它。

我不确定使用 Task.Run 是否是解决此问题的正确方法,因为它受 I/O 约束,据我所知,Task.Run 适用于 CPU-绑定工作和 async/await 用于 I/O-绑定工作。我也不明白这些任务是如何进入(这是正确的术语)信号量的,因为它们所做的只是在其上调用 .Release()

using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
    var tasks = new List<Task>();
    var semaphore = new SemaphoreSlim(5);
    try
    {
        sftp.Connect();

        foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
        {
            tasks.Add(
                Task.Run(() =>
                {
                    sftp.UploadAsync(
                        File.OpenRead(file),      // Stream input
                        Path.GetFileName(file),   // string path
                        true));                   // bool canOverride
                    semaphore.Release();
                });
        }

        await Task.WhenAll(tasks);
        sftp.Disconnect();
    }
    // trimmed catch
}

from what I know, Task.Run is for CPU-bound work

正确。

and async/await for I/O-bound work.

没有。 await 是一个用于向异步操作添加延续的工具。 它不关心异步操作的性质。它只是让组合异步操作任何类型变得更容易。

如果你想将几个异步操作组合在一起,你可以通过制作一个 async 方法来实现,使用各种异步操作,在你需要它们的结果时 awaiting 它们(或者让它们完成),然后使用 Task 形式将该方法作为其自己的新异步操作。

在你的情况下,你的新异步操作只需要等待信号量,上传你的文件,然后释放信号量。

async Task UploadFile()
{
    await semaphore.WaitAsync();
    try
    {
        await sftp.UploadAsync(
            File.OpenRead(file),
            Path.GetFileName(file),
            true));   
    }
    finally
    {
        semaphore.Release();
    }
}

现在您可以简单地为每个文件调用该方法。

另外,因为这是这样的常见操作,您可能会觉得值得 create a new class to handle this logic 这样您就可以简单地创建一个队列,然后添加将项目添加到队列中,并让它在内部处理节流,而不是在你使用它的任何地方复制该机制。