将 SemaphoreSlim 与 Parallel.ForEach 一起使用

Using SemaphoreSlim with Parallel.ForEach

这就是我想要实现的目标。假设我有一个进程每分钟运行一次并执行一些 I/O 操作。我想要 5 个线程同时执行并执行操作。假设如果 2 个线程花费的时间超过一分钟,并且当进程在一分钟后再次运行时,它应该同时执行 3 个线程,因为 2 个线程已经在执行一些操作。

所以,我使用了SemaphoreSlimParallel.ForEach的组合。请让我知道这是实现此目的的正确方法还是有其他更好的方法。

private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);

private async Task ExecuteAsync()
{
    try
    {
        var availableThreads = _semaphoreSlim.CurrentCount;

        if (availableThreads > 0)
        {
            var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table

            Parallel.ForEach(
                lists,
                new ParallelOptions
                {
                    MaxDegreeOfParallelism = availableThreads
                },
                async item =>
                {
                    await _semaphoreSlim.WaitAsync();

                    try
                    {
                        // I/O operations
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                });
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex.Message, ex);
    }
}

Let's say I have a process which runs every minute and performs some I/O operations... Suppose if 2 threads took longer than a minute and when the process runs again after a minute, it should execute 3 threads simultaneously as 2 threads are already doing some operations.

这种问题描述有点常见,但正确编码却出奇地困难。这是因为您有一个轮询式计时器(基于时间)试图定期 调整 节流机制。正确地做到这一点是相当困难的。

因此,我建议的第一件事是更改问题描述。考虑让轮询机制读取 all 未完成的工作,然后从那里使用正常节流(例如,将 then 添加到执行受限的 ActionBlock)。

就是说,如果您希望继续走更复杂的路径,像这样的代码可以避免 Parallelasync 问题:

private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);

private async Task ExecuteAsync()
{
    try
    {
        var availableThreads = _semaphoreSlim.CurrentCount;

        if (availableThreads > 0)
        {
            var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table

            var tasks = lists.Select(
                async item =>
                {
                    await _semaphoreSlim.WaitAsync();

                    try
                    {
                        // I/O operations
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                }).ToList();
            await Task.WhenAll(tasks);
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex.Message, ex);
    }
}