作业的多线程队列
Multithread queue of jobs
我有一个可以由多个线程填充的作业队列 (ConcurrentQueue<MyJob>
)。我需要异步地(不是通过主线程)实现此作业的连续执行,但 只能同时由一个 线程执行。我试过这样的事情:
public class ConcurrentLoop {
private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();
private static Task _currentTask;
private static object _lock = new object();
public static void QueueJob(Job job)
{
_concurrentQueue.Enqueue(job);
checkLoop();
}
private static void checkLoop()
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
lock (_lock)
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
_currentTask = Task.Run(() =>
{
MyJob current;
while( _concurrentQueue.TryDequeue( out current ) )
//Do something
});
}
}
}
}
}
我认为这段代码有问题:如果任务完成执行(TryDequeue
returns false 但任务尚未标记为已完成),此时我得到了一份新工作,它不会被执行。我对吗?如果是这样,如何解决这个问题
你的问题陈述看起来像一个 producer-consumer 问题,需要注意的是你只需要一个消费者。
无需手动重新实现此类功能。
相反,我建议使用 BlockingCollection
—— 在内部它使用 ConcurrentQueue
和一个单独的线程来消费。
请注意,这可能适合也可能不适合您的用例。
类似于:
_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection
_consumingThread = new Thread(() =>
{
foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added.
{
// do work with workItem
}
});
_consumingThread.Start();
多个生产者(任务或线程)可以将工作项添加到 _blockingCollection
没问题,并且无需担心同步 producers/consumer.
当您完成生产任务后,调用_blockingCollection.CompleteAdding()
(此方法不是线程安全的,因此建议事先停止所有生产者)。
也许,您还应该在某处执行 _consumingThread.Join()
以终止您的消费线程。
我会为此使用 Microsoft 的 Reactive Framework Team 的 Reactive Extensions (NuGet "System.Reactive")。这是一个可爱的抽象。
public class ConcurrentLoop
{
private static Subject<MyJob> _jobs = new Subject<MyJob>();
private static IDisposable _subscription =
_jobs
.Synchronize()
.ObserveOn(Scheduler.Default)
.Subscribe(job =>
{
//Do something
});
public static void QueueJob(MyJob job)
{
_jobs.OnNext(job);
}
}
这很好地将所有传入的作业同步到一个流中,并将执行推到 Scheduler.Default
(基本上是线程池),但是因为它已经序列化了所有输入,一次只能发生一个.这样做的好处是,如果值之间存在显着差异,它会释放线程。这是一个非常精简的解决方案。
要清理,您只需调用 _jobs.OnCompleted();
或 _subscription.Dispose();
。
我有一个可以由多个线程填充的作业队列 (ConcurrentQueue<MyJob>
)。我需要异步地(不是通过主线程)实现此作业的连续执行,但 只能同时由一个 线程执行。我试过这样的事情:
public class ConcurrentLoop {
private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();
private static Task _currentTask;
private static object _lock = new object();
public static void QueueJob(Job job)
{
_concurrentQueue.Enqueue(job);
checkLoop();
}
private static void checkLoop()
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
lock (_lock)
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
_currentTask = Task.Run(() =>
{
MyJob current;
while( _concurrentQueue.TryDequeue( out current ) )
//Do something
});
}
}
}
}
}
我认为这段代码有问题:如果任务完成执行(TryDequeue
returns false 但任务尚未标记为已完成),此时我得到了一份新工作,它不会被执行。我对吗?如果是这样,如何解决这个问题
你的问题陈述看起来像一个 producer-consumer 问题,需要注意的是你只需要一个消费者。
无需手动重新实现此类功能。
相反,我建议使用 BlockingCollection
—— 在内部它使用 ConcurrentQueue
和一个单独的线程来消费。
请注意,这可能适合也可能不适合您的用例。
类似于:
_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection
_consumingThread = new Thread(() =>
{
foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added.
{
// do work with workItem
}
});
_consumingThread.Start();
多个生产者(任务或线程)可以将工作项添加到 _blockingCollection
没问题,并且无需担心同步 producers/consumer.
当您完成生产任务后,调用_blockingCollection.CompleteAdding()
(此方法不是线程安全的,因此建议事先停止所有生产者)。
也许,您还应该在某处执行 _consumingThread.Join()
以终止您的消费线程。
我会为此使用 Microsoft 的 Reactive Framework Team 的 Reactive Extensions (NuGet "System.Reactive")。这是一个可爱的抽象。
public class ConcurrentLoop
{
private static Subject<MyJob> _jobs = new Subject<MyJob>();
private static IDisposable _subscription =
_jobs
.Synchronize()
.ObserveOn(Scheduler.Default)
.Subscribe(job =>
{
//Do something
});
public static void QueueJob(MyJob job)
{
_jobs.OnNext(job);
}
}
这很好地将所有传入的作业同步到一个流中,并将执行推到 Scheduler.Default
(基本上是线程池),但是因为它已经序列化了所有输入,一次只能发生一个.这样做的好处是,如果值之间存在显着差异,它会释放线程。这是一个非常精简的解决方案。
要清理,您只需调用 _jobs.OnCompleted();
或 _subscription.Dispose();
。