如何限制并行任务数运行?

How to limit number of parallel tasks running?

我有代码可以侦听文件夹并检测放入其中的新文件。 目前,每个放置的新文件都会调用 WatcherOnCreated 方法,该方法会创建一个新任务来与其他文件并行加密文件。

我想限制发生的并行文件加密的数量。

我尝试使用计数为 5 的信号量,这样无论放入多少文件,都只能同时进行 5 次加密。

但是,它不起作用并导致我的程序无法响应。

class Test
{
        private static LimitEncryptionSemaphore;

        public Test()
        {
            LimitEncryptionSemaphore = new SemaphoreSlim(5);
        }
        
        // some function which calls WatcherOnCreated
         
        private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
        {
            string fileName = Path.GetFileName(filePath);

            Logger.Debug($"A created item {fileName} was detected in drop folder.");

            LimitEncryptionSemaphore.WaitAsync();
            // FireAndForget calls Task.Run(Func<Task>) with the callback function
            TaskFactory.FireAndForget(async () =>
            {
                try
                {
                    await ExponentialBackoffPolicy.ExecuteAsync(async () =>
                    {
                        using (DeviceData deviceData = ReadDeviceData(filePath))
                        {
                            // Raise the event.
                            await OnDeviceDataAvailable(FolderDevice, deviceData);
                        }
                    });

                    if (FileSystem.Exists(filePath))
                    {
                        // Delete the file once the handler is done.
                        FileSystem.DeleteFile(filePath);

                        Logger.Debug($"{filePath} was deleted.");
                    }
                }
                catch (Exception ex)
                {
                    Logger.Error(ex);
                }
                finally
                {
                    // Release semaphore
                    LimitEncryptionSemaphore.Release();
                }
            });

        }

我认为,由于您无法控制丢弃的文件数量,因此最好使用并发队列和循环处理排队的条目。

在文件观察器事件队列中队列中的项目并启动后台工作程序来处理数据。如果队列太长,您可以启动一个新的后台进程,但是如果您这样做,您可能会耗尽服务器处理器。

您还可以使用带有参数的 Parallel.For 循环,如果队列比您喜欢的长,您希望开始的最大值,在您的情况下为 5.

代码有几个问题:

  1. LimitEncryptionSemaphore.WaitAsync 返回的任务被忽略,而不是被 awaited。
  2. LimitEncryptionSemaphore.WaitAsync 的调用方法与 LimitEncryptionSemaphore.Release.
  3. 不同

修复:

private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
{
  string fileName = Path.GetFileName(filePath);
  Logger.Debug($"A created item {fileName} was detected in drop folder.");

  TaskFactory.FireAndForget(async () =>
  {
    await LimitEncryptionSemaphore.WaitAsync();
    try
    {
      ...
    }
    catch (Exception ex)
    {
      ...
    }
    finally
    {
      LimitEncryptionSemaphore.Release();
    }
  });
}

TaskScheduler 的 .Net 文档有一个实现具有最大并行度的调度程序的示例。请看看那个 https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0