使用 C# Parallel.ForEach 循环处理 SFTP 文件不处理下载
Processing SFTP files using C# Parallel.ForEach loop not processing downloads
我正在使用 Renci SSH.NET 软件包版本 2016。我正在从外部服务器下载文件。我通常每 6 秒可以下载大约一个文件,这在您有数千个文件时很糟糕。 我最近尝试将 foreach
循环更改为 Parallel.ForEach
。这样做将文件下载时间更改为 1.5 秒。除了当我检查文件时,它们都是 0 KB,所以它没有下载任何东西。并行循环有什么问题吗?我是 C# 的新手,正在尝试缩短下载时间
Parallel.ForEach(summary.RemoteFiles, (f, loopstate) =>
{
//Are we still connected? If not, reestablish a connection for up to a max of "MaxReconnectAttempts"
if (!sftp.IsConnected)
{
int maxAttempts = Convert.ToInt32(ConfigurationManager.AppSettings["MaxReconnectAttempts"]);
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service has been connected from remote system, attempting to reconnect (" + sftpConnInfo.Host + ":" + sftpConnInfo.Port.ToString() + remotePath + " - Attempt 1 of " + maxAttempts.ToString() + ")", Location = locationName });
for (int attempts = 1; attempts <= maxAttempts; attempts++)
{
sftp.Connect();
if (sftp.IsConnected)
{
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service - Connection reestablished (" + remotePath + ")", Location = locationName });
break;
}
else
{
if ((attempts + 1) <= maxAttempts)
{
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service still disconnected from remote system, preparing another reconnect attempt (" + sftpConnInfo.Host + ":" + sftpConnInfo.Port.ToString() + remotePath + " - Attempt " + (attempts + 1).ToString() + " of " + maxAttempts.ToString() + ")", Location = locationName });
System.Threading.Thread.Sleep(2000);
}
else
{
//Max reconnect attempts reached - end the session and ensure the appropriate "failure" workflow is triggered
connectionLost = true;
}
}
}
}
if (connectionLost)
loopstate.Break();
// break;
totalFileCount++;
try
{
if (!System.IO.File.Exists(localSaveLocation + f.FileName))
{
System.Diagnostics.Debug.WriteLine("\tDownloading file " + totalFileCount.ToString() + "(" + f.FileName + ")");
System.IO.Stream localFile = System.IO.File.OpenWrite(localSaveLocation + f.FileName);
//Log remote file name, local file name, date/time start
start = DateTime.Now;
sftp.DownloadFile(f.FullName, localFile);
end = DateTime.Now;
//Log remote file name, local file name, date/time complete (increment the "successful" downloads by 1)
timeElapsed = end.Subtract(start);
runningSeconds += timeElapsed.TotalSeconds;
runningAvg = runningSeconds / Convert.ToDouble(totalFileCount);
estimatedSecondsRemaining = (summary.RemoteFiles.Count - totalFileCount) * runningAvg;
elapsedTimeString = timeElapsed.TotalSeconds.ToString("#.####") + " seconds";
System.Diagnostics.Debug.WriteLine("\tCompleted downloading file in " + elapsedTimeString + " " + "(" + f.FileName + ")");
downloadedFileCount++;
ProcessFileComplete(this, new Types.ProcessFileCompleteEventArgs() { downloadSuccessful = true, elapsedTime = timeElapsed.TotalSeconds, fileName = f.FileName, fullLocalPath = localSaveLocation + f.FileName, Location = locationName, FilesDownloaded = totalFileCount, FilesRemaining = (summary.RemoteFiles.Count - totalFileCount), AvgSecondsPerDownload = runningAvg, TotalSecondsElapsed = runningSeconds, EstimatedTimeRemaining = TimeSpan.FromSeconds(estimatedSecondsRemaining) });
f.FileDownloaded = true;
if (deleteAfterDownload)
sftp.DeleteFile(f.FullName);
}
else
{
System.Diagnostics.Debug.WriteLine("\tFile " + totalFileCount.ToString() + "(" + f.FileName + ") already exists locally");
downloadedFileCount++;
ProcessFileComplete(this, new Types.ProcessFileCompleteEventArgs() { downloadSuccessful = true, elapsedTime = 0, fileName = f.FileName + " (File already exists locally)", fullLocalPath = localSaveLocation + f.FileName, Location = locationName, FilesDownloaded = totalFileCount, FilesRemaining = (summary.RemoteFiles.Count - totalFileCount), AvgSecondsPerDownload = runningAvg, TotalSecondsElapsed = runningSeconds, EstimatedTimeRemaining = TimeSpan.FromSeconds(estimatedSecondsRemaining) });
f.FileDownloaded = true;
if (deleteAfterDownload)
sftp.DeleteFile(f.FullName);
}
}
catch (System.Exception ex)
{
// We log stuff here
}
});
我不知道你为什么得到空文件。尽管我怀疑您没有关闭 localFile
流。
虽然,即使您的代码有效,如果您使用相同的连接进行下载,您也几乎不会获得任何性能优势,因为 SFTP 传输往往受到网络延迟或 CPU.你必须使用多个连接来克服这个问题。
看我的answer on Server Fault about factors that affect SFTP transfer speed.
实现一些连接池并每次选择一个空闲连接。
简单示例:
var clients = new ConcurrentBag<SftpClient>();
var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
Parallel.ForEach(files, opts, (f, loopstate) => {
if (!clients.TryTake(out var client))
{
client = new SftpClient(hostName, userName, password);
client.Connect();
}
string localPath = Path.Combine(destPath, f.Name);
Console.WriteLine(
"Thread {0}, Connection {1}, File {2} => {3}",
Thread.CurrentThread.ManagedThreadId, client.GetHashCode(),
f.FullName, localPath);
using (var stream = File.Create(localPath))
{
client.DownloadFile(f.FullName, stream);
}
clients.Add(client);
});
Console.WriteLine("Closing {0} connections", clients.Count);
foreach (var client in clients)
{
client.Dispose();
}
另一种方法是启动固定数量的线程,每个线程有一个连接,并让它们从队列中选择文件。
有关实现示例,请参阅我关于 WinSCP .NET 程序集的文章:
Automating transfers in parallel connections over SFTP/FTP protocol
关于FTP的类似问题:
我正在使用 Renci SSH.NET 软件包版本 2016。我正在从外部服务器下载文件。我通常每 6 秒可以下载大约一个文件,这在您有数千个文件时很糟糕。 我最近尝试将 foreach
循环更改为 Parallel.ForEach
。这样做将文件下载时间更改为 1.5 秒。除了当我检查文件时,它们都是 0 KB,所以它没有下载任何东西。并行循环有什么问题吗?我是 C# 的新手,正在尝试缩短下载时间
Parallel.ForEach(summary.RemoteFiles, (f, loopstate) =>
{
//Are we still connected? If not, reestablish a connection for up to a max of "MaxReconnectAttempts"
if (!sftp.IsConnected)
{
int maxAttempts = Convert.ToInt32(ConfigurationManager.AppSettings["MaxReconnectAttempts"]);
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service has been connected from remote system, attempting to reconnect (" + sftpConnInfo.Host + ":" + sftpConnInfo.Port.ToString() + remotePath + " - Attempt 1 of " + maxAttempts.ToString() + ")", Location = locationName });
for (int attempts = 1; attempts <= maxAttempts; attempts++)
{
sftp.Connect();
if (sftp.IsConnected)
{
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service - Connection reestablished (" + remotePath + ")", Location = locationName });
break;
}
else
{
if ((attempts + 1) <= maxAttempts)
{
StatusUpdate(this, new Types.StatusUpdateEventArgs() { message = "SFTP Service still disconnected from remote system, preparing another reconnect attempt (" + sftpConnInfo.Host + ":" + sftpConnInfo.Port.ToString() + remotePath + " - Attempt " + (attempts + 1).ToString() + " of " + maxAttempts.ToString() + ")", Location = locationName });
System.Threading.Thread.Sleep(2000);
}
else
{
//Max reconnect attempts reached - end the session and ensure the appropriate "failure" workflow is triggered
connectionLost = true;
}
}
}
}
if (connectionLost)
loopstate.Break();
// break;
totalFileCount++;
try
{
if (!System.IO.File.Exists(localSaveLocation + f.FileName))
{
System.Diagnostics.Debug.WriteLine("\tDownloading file " + totalFileCount.ToString() + "(" + f.FileName + ")");
System.IO.Stream localFile = System.IO.File.OpenWrite(localSaveLocation + f.FileName);
//Log remote file name, local file name, date/time start
start = DateTime.Now;
sftp.DownloadFile(f.FullName, localFile);
end = DateTime.Now;
//Log remote file name, local file name, date/time complete (increment the "successful" downloads by 1)
timeElapsed = end.Subtract(start);
runningSeconds += timeElapsed.TotalSeconds;
runningAvg = runningSeconds / Convert.ToDouble(totalFileCount);
estimatedSecondsRemaining = (summary.RemoteFiles.Count - totalFileCount) * runningAvg;
elapsedTimeString = timeElapsed.TotalSeconds.ToString("#.####") + " seconds";
System.Diagnostics.Debug.WriteLine("\tCompleted downloading file in " + elapsedTimeString + " " + "(" + f.FileName + ")");
downloadedFileCount++;
ProcessFileComplete(this, new Types.ProcessFileCompleteEventArgs() { downloadSuccessful = true, elapsedTime = timeElapsed.TotalSeconds, fileName = f.FileName, fullLocalPath = localSaveLocation + f.FileName, Location = locationName, FilesDownloaded = totalFileCount, FilesRemaining = (summary.RemoteFiles.Count - totalFileCount), AvgSecondsPerDownload = runningAvg, TotalSecondsElapsed = runningSeconds, EstimatedTimeRemaining = TimeSpan.FromSeconds(estimatedSecondsRemaining) });
f.FileDownloaded = true;
if (deleteAfterDownload)
sftp.DeleteFile(f.FullName);
}
else
{
System.Diagnostics.Debug.WriteLine("\tFile " + totalFileCount.ToString() + "(" + f.FileName + ") already exists locally");
downloadedFileCount++;
ProcessFileComplete(this, new Types.ProcessFileCompleteEventArgs() { downloadSuccessful = true, elapsedTime = 0, fileName = f.FileName + " (File already exists locally)", fullLocalPath = localSaveLocation + f.FileName, Location = locationName, FilesDownloaded = totalFileCount, FilesRemaining = (summary.RemoteFiles.Count - totalFileCount), AvgSecondsPerDownload = runningAvg, TotalSecondsElapsed = runningSeconds, EstimatedTimeRemaining = TimeSpan.FromSeconds(estimatedSecondsRemaining) });
f.FileDownloaded = true;
if (deleteAfterDownload)
sftp.DeleteFile(f.FullName);
}
}
catch (System.Exception ex)
{
// We log stuff here
}
});
我不知道你为什么得到空文件。尽管我怀疑您没有关闭 localFile
流。
虽然,即使您的代码有效,如果您使用相同的连接进行下载,您也几乎不会获得任何性能优势,因为 SFTP 传输往往受到网络延迟或 CPU.你必须使用多个连接来克服这个问题。
看我的answer on Server Fault about factors that affect SFTP transfer speed.
实现一些连接池并每次选择一个空闲连接。
简单示例:
var clients = new ConcurrentBag<SftpClient>();
var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
Parallel.ForEach(files, opts, (f, loopstate) => {
if (!clients.TryTake(out var client))
{
client = new SftpClient(hostName, userName, password);
client.Connect();
}
string localPath = Path.Combine(destPath, f.Name);
Console.WriteLine(
"Thread {0}, Connection {1}, File {2} => {3}",
Thread.CurrentThread.ManagedThreadId, client.GetHashCode(),
f.FullName, localPath);
using (var stream = File.Create(localPath))
{
client.DownloadFile(f.FullName, stream);
}
clients.Add(client);
});
Console.WriteLine("Closing {0} connections", clients.Count);
foreach (var client in clients)
{
client.Dispose();
}
另一种方法是启动固定数量的线程,每个线程有一个连接,并让它们从队列中选择文件。
有关实现示例,请参阅我关于 WinSCP .NET 程序集的文章:
Automating transfers in parallel connections over SFTP/FTP protocol
关于FTP的类似问题: