限制并发异步任务
Limiting concurrent async tasks
我想使用 SSH.NET
库和 Renci.SshNet.Async
扩展将大批量(可能是 100 多个)文件上传到 FTP。我需要将并发上传的数量限制为五个,或者我发现 FTP 可以处理的任何数量。
这是我在任何限制之前的代码:
using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
var tasks = new List<Task>();
try
{
sftp.Connect();
foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
{
tasks.Add(
sftp.UploadAsync(
File.OpenRead(file), // Stream input
Path.GetFileName(file), // string path
true)); // bool canOverride
}
await Task.WhenAll(tasks);
sftp.Disconnect();
}
// trimmed catch
}
我读过 SemaphoreSlim
,但我不完全理解它的工作原理以及它如何与 TAP 一起使用。这是基于 MSDN documentation,我将如何实现它。
我不确定使用 Task.Run
是否是解决此问题的正确方法,因为它受 I/O 约束,据我所知,Task.Run
适用于 CPU-绑定工作和 async
/await
用于 I/O-绑定工作。我也不明白这些任务是如何进入(这是正确的术语)信号量的,因为它们所做的只是在其上调用 .Release()
。
using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
var tasks = new List<Task>();
var semaphore = new SemaphoreSlim(5);
try
{
sftp.Connect();
foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
{
tasks.Add(
Task.Run(() =>
{
sftp.UploadAsync(
File.OpenRead(file), // Stream input
Path.GetFileName(file), // string path
true)); // bool canOverride
semaphore.Release();
});
}
await Task.WhenAll(tasks);
sftp.Disconnect();
}
// trimmed catch
}
from what I know, Task.Run is for CPU-bound work
正确。
and async/await for I/O-bound work.
没有。 await
是一个用于向异步操作添加延续的工具。 它不关心异步操作的性质。它只是让组合异步操作任何类型变得更容易。
如果你想将几个异步操作组合在一起,你可以通过制作一个 async
方法来实现,使用各种异步操作,在你需要它们的结果时 await
ing 它们(或者让它们完成),然后使用 Task
形式将该方法作为其自己的新异步操作。
在你的情况下,你的新异步操作只需要等待信号量,上传你的文件,然后释放信号量。
async Task UploadFile()
{
await semaphore.WaitAsync();
try
{
await sftp.UploadAsync(
File.OpenRead(file),
Path.GetFileName(file),
true));
}
finally
{
semaphore.Release();
}
}
现在您可以简单地为每个文件调用该方法。
另外,因为这是这样的常见操作,您可能会觉得值得 create a new class to handle this logic 这样您就可以简单地创建一个队列,然后添加将项目添加到队列中,并让它在内部处理节流,而不是在你使用它的任何地方复制该机制。
我想使用 SSH.NET
库和 Renci.SshNet.Async
扩展将大批量(可能是 100 多个)文件上传到 FTP。我需要将并发上传的数量限制为五个,或者我发现 FTP 可以处理的任何数量。
这是我在任何限制之前的代码:
using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
var tasks = new List<Task>();
try
{
sftp.Connect();
foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
{
tasks.Add(
sftp.UploadAsync(
File.OpenRead(file), // Stream input
Path.GetFileName(file), // string path
true)); // bool canOverride
}
await Task.WhenAll(tasks);
sftp.Disconnect();
}
// trimmed catch
}
我读过 SemaphoreSlim
,但我不完全理解它的工作原理以及它如何与 TAP 一起使用。这是基于 MSDN documentation,我将如何实现它。
我不确定使用 Task.Run
是否是解决此问题的正确方法,因为它受 I/O 约束,据我所知,Task.Run
适用于 CPU-绑定工作和 async
/await
用于 I/O-绑定工作。我也不明白这些任务是如何进入(这是正确的术语)信号量的,因为它们所做的只是在其上调用 .Release()
。
using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
var tasks = new List<Task>();
var semaphore = new SemaphoreSlim(5);
try
{
sftp.Connect();
foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
{
tasks.Add(
Task.Run(() =>
{
sftp.UploadAsync(
File.OpenRead(file), // Stream input
Path.GetFileName(file), // string path
true)); // bool canOverride
semaphore.Release();
});
}
await Task.WhenAll(tasks);
sftp.Disconnect();
}
// trimmed catch
}
from what I know, Task.Run is for CPU-bound work
正确。
and async/await for I/O-bound work.
没有。 await
是一个用于向异步操作添加延续的工具。 它不关心异步操作的性质。它只是让组合异步操作任何类型变得更容易。
如果你想将几个异步操作组合在一起,你可以通过制作一个 async
方法来实现,使用各种异步操作,在你需要它们的结果时 await
ing 它们(或者让它们完成),然后使用 Task
形式将该方法作为其自己的新异步操作。
在你的情况下,你的新异步操作只需要等待信号量,上传你的文件,然后释放信号量。
async Task UploadFile()
{
await semaphore.WaitAsync();
try
{
await sftp.UploadAsync(
File.OpenRead(file),
Path.GetFileName(file),
true));
}
finally
{
semaphore.Release();
}
}
现在您可以简单地为每个文件调用该方法。
另外,因为这是这样的常见操作,您可能会觉得值得 create a new class to handle this logic 这样您就可以简单地创建一个队列,然后添加将项目添加到队列中,并让它在内部处理节流,而不是在你使用它的任何地方复制该机制。