将 SemaphoreSlim 与 Parallel.ForEach 一起使用
Using SemaphoreSlim with Parallel.ForEach
这就是我想要实现的目标。假设我有一个进程每分钟运行一次并执行一些 I/O 操作。我想要 5 个线程同时执行并执行操作。假设如果 2 个线程花费的时间超过一分钟,并且当进程在一分钟后再次运行时,它应该同时执行 3 个线程,因为 2 个线程已经在执行一些操作。
所以,我使用了SemaphoreSlim
和Parallel.ForEach
的组合。请让我知道这是实现此目的的正确方法还是有其他更好的方法。
private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);
private async Task ExecuteAsync()
{
try
{
var availableThreads = _semaphoreSlim.CurrentCount;
if (availableThreads > 0)
{
var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table
Parallel.ForEach(
lists,
new ParallelOptions
{
MaxDegreeOfParallelism = availableThreads
},
async item =>
{
await _semaphoreSlim.WaitAsync();
try
{
// I/O operations
}
finally
{
_semaphoreSlim.Release();
}
});
}
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex);
}
}
Let's say I have a process which runs every minute and performs some I/O operations... Suppose if 2 threads took longer than a minute and when the process runs again after a minute, it should execute 3 threads simultaneously as 2 threads are already doing some operations.
这种问题描述有点常见,但正确编码却出奇地困难。这是因为您有一个轮询式计时器(基于时间)试图定期 调整 节流机制。正确地做到这一点是相当困难的。
因此,我建议的第一件事是更改问题描述。考虑让轮询机制读取 all 未完成的工作,然后从那里使用正常节流(例如,将 then 添加到执行受限的 ActionBlock
)。
就是说,如果您希望继续走更复杂的路径,像这样的代码可以避免 Parallel
和 async
问题:
private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);
private async Task ExecuteAsync()
{
try
{
var availableThreads = _semaphoreSlim.CurrentCount;
if (availableThreads > 0)
{
var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table
var tasks = lists.Select(
async item =>
{
await _semaphoreSlim.WaitAsync();
try
{
// I/O operations
}
finally
{
_semaphoreSlim.Release();
}
}).ToList();
await Task.WhenAll(tasks);
}
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex);
}
}
这就是我想要实现的目标。假设我有一个进程每分钟运行一次并执行一些 I/O 操作。我想要 5 个线程同时执行并执行操作。假设如果 2 个线程花费的时间超过一分钟,并且当进程在一分钟后再次运行时,它应该同时执行 3 个线程,因为 2 个线程已经在执行一些操作。
所以,我使用了SemaphoreSlim
和Parallel.ForEach
的组合。请让我知道这是实现此目的的正确方法还是有其他更好的方法。
private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);
private async Task ExecuteAsync()
{
try
{
var availableThreads = _semaphoreSlim.CurrentCount;
if (availableThreads > 0)
{
var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table
Parallel.ForEach(
lists,
new ParallelOptions
{
MaxDegreeOfParallelism = availableThreads
},
async item =>
{
await _semaphoreSlim.WaitAsync();
try
{
// I/O operations
}
finally
{
_semaphoreSlim.Release();
}
});
}
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex);
}
}
Let's say I have a process which runs every minute and performs some I/O operations... Suppose if 2 threads took longer than a minute and when the process runs again after a minute, it should execute 3 threads simultaneously as 2 threads are already doing some operations.
这种问题描述有点常见,但正确编码却出奇地困难。这是因为您有一个轮询式计时器(基于时间)试图定期 调整 节流机制。正确地做到这一点是相当困难的。
因此,我建议的第一件事是更改问题描述。考虑让轮询机制读取 all 未完成的工作,然后从那里使用正常节流(例如,将 then 添加到执行受限的 ActionBlock
)。
就是说,如果您希望继续走更复杂的路径,像这样的代码可以避免 Parallel
和 async
问题:
private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);
private async Task ExecuteAsync()
{
try
{
var availableThreads = _semaphoreSlim.CurrentCount;
if (availableThreads > 0)
{
var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table
var tasks = lists.Select(
async item =>
{
await _semaphoreSlim.WaitAsync();
try
{
// I/O operations
}
finally
{
_semaphoreSlim.Release();
}
}).ToList();
await Task.WhenAll(tasks);
}
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex);
}
}