C# 并发:使用多个 AutoResetEvent 是个好主意吗?
C# concurrent: Is it a good idea to use many AutoResetEvent?
假设有很多线程调用 Do()
,只有一个工作线程处理实际工作。
void Do(Job job)
{
concurrentQueue.Enqueue(job);
// wait for job done
}
void workerThread()
{
while (true)
{
Job job;
if (concurrentQueue.TryDequeue(out job))
{
// do job
}
}
}
Do() 应该等到 return 之前的工作完成。所以我写了下面的代码:
class Task
{
public Job job;
public AutoResetEvent ev;
}
void Do(Job job)
{
using (var ev = new AutoResetEvent(false))
{
concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
ev.WaitOne();
}
}
void workerThread()
{
while (true)
{
Task task;
if (concurrentQueue.TryDequeue(out task))
{
// do job
task.ev.Set();
}
}
}
经过一些测试,我发现它按预期工作。但是我不确定分配许多 AutoResetEvents 是否是一个好方法,或者是否有更好的方法来完成?
从同步的角度来看,这工作正常。
不过这样做好像没什么用。如果你想一个接一个地执行作业,你可以使用锁:
lock (lockObject) {
RunJob();
}
您使用此代码的目的是什么?
还有一个效率问题,因为每个任务都会创建一个 OS 事件并等待它。如果您使用更现代的 TaskCompletionSource
如果您同步等待该任务,这将在引擎盖下使用相同的东西。您可以使用异步等待 (await myTCS.Task;
) 来提高效率。当然,这会用 async/await 感染整个调用堆栈。如果这是一个相当低的交易量操作,您将不会获得太多收益。
总的来说,我认为可行,尽管当您说 "many" 线程正在调用 Do() 时,这可能无法很好地扩展……挂起的线程使用资源。
此代码的另一个问题是,在空闲时间,您将在 "workerThread" 中有一个 "hard loop",这将导致您的应用程序 return 高 CPU 利用率.您可能希望将此代码添加到 "workerThread":
if (concurrentQueue.IsEmpty) Thread.Sleep(1);
您可能还想为 WaitOne 调用引入超时以避免日志堵塞。
由于所有客户端都必须等待一个线程来完成这项工作,因此没有真正需要使用队列。所以我建议使用 Monitor
class instead, and specifically the Wait/Pulse 功能。虽然它有点低级和冗长。
class Worker<TResult> : IDisposable
{
private readonly object _outerLock = new object();
private readonly object _innerLock = new object();
private Func<TResult> _currentJob;
private TResult _currentResult;
private Exception _currentException;
private bool _disposed;
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
lock (_innerLock)
{
while (true)
{
Monitor.Wait(_innerLock); // Wait for client requests
if (_disposed) break;
try
{
_currentResult = _currentJob.Invoke();
_currentException = null;
}
catch (Exception ex)
{
_currentException = ex;
_currentResult = default;
}
Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
}
} // We are done
}
public TResult DoWork(Func<TResult> job)
{
TResult result;
Exception exception;
lock (_outerLock) // Accept only one client at a time
{
lock (_innerLock) // Acquire inner lock
{
if (_disposed) throw new InvalidOperationException();
_currentJob = job;
Monitor.Pulse(_innerLock); // Notify worker thread about the new job
Monitor.Wait(_innerLock); // Wait for worker thread to process the job
result = _currentResult;
exception = _currentException;
// Clean up
_currentJob = null;
_currentResult = default;
_currentException = null;
}
}
// Throw the exception, if occurred, preserving the stack trace
if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
return result;
}
public void Dispose()
{
lock (_outerLock)
{
lock (_innerLock)
{
_disposed = true;
Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
}
}
}
}
用法示例:
var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
输出:
Result: 1
更新: 以前的解决方案对等待不友好,所以这里是允许适当等待的解决方案。它为每个作业使用 TaskCompletionSource
,存储在 BlockingCollection
.
中
class Worker<TResult> : IDisposable
{
private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
= new BlockingCollection<TaskCompletionSource<TResult>>();
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
{
var job = (Func<TResult>)tcs.Task.AsyncState;
try
{
var result = job.Invoke();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
public Task<TResult> DoWorkAsync(Func<TResult> job)
{
var tcs = new TaskCompletionSource<TResult>(job,
TaskCreationOptions.RunContinuationsAsynchronously);
_blockingCollection.Add(tcs);
return tcs.Task;
}
public TResult DoWork(Func<TResult> job) // Synchronous call
{
var task = DoWorkAsync(job);
try { task.Wait(); } catch { } // Swallow the AggregateException
// Throw the original exception, if occurred, preserving the stack trace
if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
return task.Result;
}
public void Dispose()
{
_blockingCollection.CompleteAdding();
}
}
使用示例
var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
输出:
Result: 1
假设有很多线程调用 Do()
,只有一个工作线程处理实际工作。
void Do(Job job)
{
concurrentQueue.Enqueue(job);
// wait for job done
}
void workerThread()
{
while (true)
{
Job job;
if (concurrentQueue.TryDequeue(out job))
{
// do job
}
}
}
Do() 应该等到 return 之前的工作完成。所以我写了下面的代码:
class Task
{
public Job job;
public AutoResetEvent ev;
}
void Do(Job job)
{
using (var ev = new AutoResetEvent(false))
{
concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
ev.WaitOne();
}
}
void workerThread()
{
while (true)
{
Task task;
if (concurrentQueue.TryDequeue(out task))
{
// do job
task.ev.Set();
}
}
}
经过一些测试,我发现它按预期工作。但是我不确定分配许多 AutoResetEvents 是否是一个好方法,或者是否有更好的方法来完成?
从同步的角度来看,这工作正常。
不过这样做好像没什么用。如果你想一个接一个地执行作业,你可以使用锁:
lock (lockObject) {
RunJob();
}
您使用此代码的目的是什么?
还有一个效率问题,因为每个任务都会创建一个 OS 事件并等待它。如果您使用更现代的 TaskCompletionSource
如果您同步等待该任务,这将在引擎盖下使用相同的东西。您可以使用异步等待 (await myTCS.Task;
) 来提高效率。当然,这会用 async/await 感染整个调用堆栈。如果这是一个相当低的交易量操作,您将不会获得太多收益。
总的来说,我认为可行,尽管当您说 "many" 线程正在调用 Do() 时,这可能无法很好地扩展……挂起的线程使用资源。
此代码的另一个问题是,在空闲时间,您将在 "workerThread" 中有一个 "hard loop",这将导致您的应用程序 return 高 CPU 利用率.您可能希望将此代码添加到 "workerThread":
if (concurrentQueue.IsEmpty) Thread.Sleep(1);
您可能还想为 WaitOne 调用引入超时以避免日志堵塞。
由于所有客户端都必须等待一个线程来完成这项工作,因此没有真正需要使用队列。所以我建议使用 Monitor
class instead, and specifically the Wait/Pulse 功能。虽然它有点低级和冗长。
class Worker<TResult> : IDisposable
{
private readonly object _outerLock = new object();
private readonly object _innerLock = new object();
private Func<TResult> _currentJob;
private TResult _currentResult;
private Exception _currentException;
private bool _disposed;
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
lock (_innerLock)
{
while (true)
{
Monitor.Wait(_innerLock); // Wait for client requests
if (_disposed) break;
try
{
_currentResult = _currentJob.Invoke();
_currentException = null;
}
catch (Exception ex)
{
_currentException = ex;
_currentResult = default;
}
Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
}
} // We are done
}
public TResult DoWork(Func<TResult> job)
{
TResult result;
Exception exception;
lock (_outerLock) // Accept only one client at a time
{
lock (_innerLock) // Acquire inner lock
{
if (_disposed) throw new InvalidOperationException();
_currentJob = job;
Monitor.Pulse(_innerLock); // Notify worker thread about the new job
Monitor.Wait(_innerLock); // Wait for worker thread to process the job
result = _currentResult;
exception = _currentException;
// Clean up
_currentJob = null;
_currentResult = default;
_currentException = null;
}
}
// Throw the exception, if occurred, preserving the stack trace
if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
return result;
}
public void Dispose()
{
lock (_outerLock)
{
lock (_innerLock)
{
_disposed = true;
Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
}
}
}
}
用法示例:
var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
输出:
Result: 1
更新: 以前的解决方案对等待不友好,所以这里是允许适当等待的解决方案。它为每个作业使用 TaskCompletionSource
,存储在 BlockingCollection
.
class Worker<TResult> : IDisposable
{
private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
= new BlockingCollection<TaskCompletionSource<TResult>>();
public Worker()
{
var thread = new Thread(MainLoop);
thread.IsBackground = true;
thread.Start();
}
private void MainLoop()
{
foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
{
var job = (Func<TResult>)tcs.Task.AsyncState;
try
{
var result = job.Invoke();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
public Task<TResult> DoWorkAsync(Func<TResult> job)
{
var tcs = new TaskCompletionSource<TResult>(job,
TaskCreationOptions.RunContinuationsAsynchronously);
_blockingCollection.Add(tcs);
return tcs.Task;
}
public TResult DoWork(Func<TResult> job) // Synchronous call
{
var task = DoWorkAsync(job);
try { task.Wait(); } catch { } // Swallow the AggregateException
// Throw the original exception, if occurred, preserving the stack trace
if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
return task.Result;
}
public void Dispose()
{
_blockingCollection.CompleteAdding();
}
}
使用示例
var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();
输出:
Result: 1