实现一个有上限和有缓冲的作业执行器
Implement a capped and buffered job executor
我想实现一个有上限和缓冲的作业执行器。
它将有一个方法:
public class CappedBufferedExecutor {
public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
public Task<bool> EnqueueAsync(string val);
}
想法是值是异步排队的,一旦fillTimeInMillisec
毫秒过去,或者缓冲区被填充到它的唯一值的上限,就会执行实际上,异步任务都已完成。执行完成后(这可能需要很长时间),可以重新填充缓冲区并可以进行新的异步执行。
我想到了下面几行伪代码
- 使用
Timer
,等待 fillTime
通过,一旦过去,创建一个新任务,它将完成工作(见下文)。
- 在新值上,锁定
rwlock
以供读取。检查缓冲区是否已满,如果是,请等待 ManualResetEvent
或 TaskCompletionSource
.
- 向缓冲区添加新值 (
HashSet<string>
)。
- 如果缓冲区已满,创建一个新的执行任务,它将锁定
rwlock
以进行写入,对所有收集的值进行处理并使用 TaskCompletionSource
唤醒所有挂起的任务。
- 等待
TaskCompletionSource
执行缓冲任务(在上一步中提到)。
我的问题:如何同步 Timer
和填充缓冲区检查,缓冲区已满时如何等待,开始执行时如何在 TaskCompletionSource
实例之间切换并允许新值到达。
您可以使用 Reactive Extensions 轻松完成某些操作。使用 Buffer
方法的基本示例:
void Main()
{
var c = new Processor();
c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000));
c.Enqueue("A");
c.Enqueue("B");
c.Enqueue("C");
Console.ReadLine();
// When application has ended, flush the buffer
c.Dispose();
}
public sealed class Processor : IDisposable
{
private IDisposable subscription;
private Subject<string> subject = new Subject<string>();
public void Enqueue(string item)
{
subject.OnNext(item);
}
public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan)
{
// Create a subscription that will produce a set of strings every second
// or when buffer has 2 items, whatever comes first
subscription = subject.AsObservable()
.Buffer(bufferCloseTimespan, bufferSize)
.Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second)
.Subscribe(async list =>
{
await Task.Run(() =>
{
Console.WriteLine(string.Join(",", list));
Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches.
});
});
}
public void Dispose()
{
subscription?.Dispose();
}
}
这将输出
A,B
一秒钟后,
C
rx 的代码是 at GitHub
更多关于 rx 的信息:http://www.introtorx.com/
可以改进此示例以保存对创建的 Task
对象的引用,以便在结束应用程序之前可以适当地等待它们,但这会给您一个大概的想法。
这只是概念,所以不要期望太高:-)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main (string[] args)
{
var buffer = CreateBuffer ();
var executor = new Executor<string> (SomeWork, buffer);
executor.ProcessingStarted += Executor_ProcessingStarted;
string userInput = null;
do
{
userInput = Console.ReadLine ();
buffer.Enqueue (userInput);
}
while (!string.IsNullOrWhiteSpace (userInput));
executor.Dispose ();
}
//----------------------------------------------------------------------------------------------------------------------------------
private static IBuffer<string> CreateBuffer ()
{
var buffer = new UniqueItemsBuffer<string> (3);
buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");
var alert = new Alert ();
var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));
return bufferWithTimeout;
}
//----------------------------------------------------------------------------------------------------------------------------------
static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks
public static bool SomeWork (string x)
{
int delay = rnd.Next (1000, 8000);
Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms");
Thread.Sleep (delay);
Console.WriteLine ($" --- SomeWork for: {x} - finished.");
return true;
}
//----------------------------------------------------------------------------------------------------------------------------------
private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
{
Task.Run (() =>
{
Task.WaitAll (items.ToArray ());
Console.WriteLine ("Finished processing tasks, count = " + items.Count);
});
}
}
//====== actual code ===================================================================================================================
public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code
public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success
//======================================================================================================================================
public interface IDataAvailableEvent<T>
{
event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)
}
//======================================================================================================================================
public interface IProcessingStartedEvent<T>
{
event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started
}
//======================================================================================================================================
public interface IBuffer<T> : IDataAvailableEvent<T>
{
bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
// returns: true = buffer is not empty, false = is emtpy
void FlushBuffer (); // should clear buffer and raise event (or not raise if buffer was already empty)
}
//======================================================================================================================================
// raises DataAvailable event when buffer cap is reached
// ignores duplicates
// you can only use this class from one thread
public class UniqueItemsBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly int capacity;
HashSet<T> items = new HashSet<T> ();
public UniqueItemsBuffer (int capacity = 10)
{
this.capacity = capacity;
}
public bool Enqueue (T item)
{
if (items.Add (item) && items.Count == capacity)
{
FlushBuffer ();
}
return items.Count > 0;
}
public void FlushBuffer ()
{
Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);
if (items.Count > 0)
{
var itemsCopy = items.ToList ();
items.Clear ();
DataAvailable?.Invoke (itemsCopy);
}
}
}
//======================================================================================================================================
public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
{
public event ItemsAvailable<Task<bool>> ProcessingStarted;
readonly ProcessItem<T> work;
readonly IDataAvailableEvent<T> dataEvent;
public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
{
this.work = work;
this.dataEvent = dataEvent;
dataEvent.DataAvailable += DataEvent_DataAvailable;
}
private void DataEvent_DataAvailable (IReadOnlyList<T> items)
{
Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);
var list = new List<Task<bool>> ();
foreach (var item in items)
{
var task = Task.Run (() => work (item));
list.Add (task);
}
Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");
ProcessingStarted?.Invoke (list);
}
public void Dispose ()
{
dataEvent.DataAvailable -= DataEvent_DataAvailable;
}
}
//======================================================================================================================================
// if you want to fill buffer using many threads - use this decorator
public sealed class ThreadSafeBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly object sync = new object ();
private ThreadSafeBuffer (IBuffer<T> target)
{
this.target = target;
this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P
}
public bool Enqueue (T item)
{
lock (sync) return target.Enqueue (item);
}
public void FlushBuffer ()
{
lock (sync) target.FlushBuffer ();
}
public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
{
if (target is ThreadSafeBuffer<T>) return target;
return new ThreadSafeBuffer<T> (target);
}
}
//======================================================================================================================================
// and now if you want to process buffer after elapsed time
public interface IAlert
{
CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)
}
// I didn't use much timers, so idk is this code good
public class Alert : IAlert
{
List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose
public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
{
var cts = new CancellationTokenSource ();
var timer = new System.Timers.Timer (delay.TotalMilliseconds);
timers.Add (timer);
timer.Elapsed += (sender, e) =>
{
timers.Remove (timer);
timer.Dispose ();
if (cts.Token.IsCancellationRequested) return;
action.Invoke ();
};
timer.AutoReset = false; // just one tick
timer.Enabled = true;
return cts;
}
}
// thread safe (maybe :-D)
public class BufferWithTimeout<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly IAlert alert;
readonly TimeSpan timeout;
CancellationTokenSource cts;
readonly object sync = new object ();
public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
{
this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
this.alert = alert;
this.timeout = timeout;
target.DataAvailable += Target_DataAvailable; // TODO: unpin event
}
private void Target_DataAvailable (IReadOnlyList<T> items)
{
lock (sync)
{
DisableTimer ();
}
DataAvailable?.Invoke (items);
}
public bool Enqueue (T item)
{
lock (sync)
{
bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)
// and now if buffer is empty, we cannot start timer
if (hasItems && cts == null) // if timer is not enabled
{
Console.WriteLine ("TIMER :: created alert");
cts = alert.CreateAlert (timeout, HandleAlert);
}
return hasItems;
}
}
public void FlushBuffer ()
{
lock (sync)
{
DisableTimer ();
target.FlushBuffer ();
}
}
private void HandleAlert ()
{
lock (sync)
{
Console.WriteLine ("TIMER :: handler, will call buffer flush");
target.FlushBuffer ();
}
}
private void DisableTimer ()
{
cts?.Cancel ();
cts = null;
Console.WriteLine ("TIMER :: disable");
}
}
}
我想实现一个有上限和缓冲的作业执行器。
它将有一个方法:
public class CappedBufferedExecutor {
public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
public Task<bool> EnqueueAsync(string val);
}
想法是值是异步排队的,一旦fillTimeInMillisec
毫秒过去,或者缓冲区被填充到它的唯一值的上限,就会执行实际上,异步任务都已完成。执行完成后(这可能需要很长时间),可以重新填充缓冲区并可以进行新的异步执行。
我想到了下面几行伪代码
- 使用
Timer
,等待fillTime
通过,一旦过去,创建一个新任务,它将完成工作(见下文)。 - 在新值上,锁定
rwlock
以供读取。检查缓冲区是否已满,如果是,请等待ManualResetEvent
或TaskCompletionSource
. - 向缓冲区添加新值 (
HashSet<string>
)。 - 如果缓冲区已满,创建一个新的执行任务,它将锁定
rwlock
以进行写入,对所有收集的值进行处理并使用TaskCompletionSource
唤醒所有挂起的任务。 - 等待
TaskCompletionSource
执行缓冲任务(在上一步中提到)。
我的问题:如何同步 Timer
和填充缓冲区检查,缓冲区已满时如何等待,开始执行时如何在 TaskCompletionSource
实例之间切换并允许新值到达。
您可以使用 Reactive Extensions 轻松完成某些操作。使用 Buffer
方法的基本示例:
void Main()
{
var c = new Processor();
c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000));
c.Enqueue("A");
c.Enqueue("B");
c.Enqueue("C");
Console.ReadLine();
// When application has ended, flush the buffer
c.Dispose();
}
public sealed class Processor : IDisposable
{
private IDisposable subscription;
private Subject<string> subject = new Subject<string>();
public void Enqueue(string item)
{
subject.OnNext(item);
}
public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan)
{
// Create a subscription that will produce a set of strings every second
// or when buffer has 2 items, whatever comes first
subscription = subject.AsObservable()
.Buffer(bufferCloseTimespan, bufferSize)
.Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second)
.Subscribe(async list =>
{
await Task.Run(() =>
{
Console.WriteLine(string.Join(",", list));
Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches.
});
});
}
public void Dispose()
{
subscription?.Dispose();
}
}
这将输出
A,B
一秒钟后,
C
rx 的代码是 at GitHub 更多关于 rx 的信息:http://www.introtorx.com/
可以改进此示例以保存对创建的 Task
对象的引用,以便在结束应用程序之前可以适当地等待它们,但这会给您一个大概的想法。
这只是概念,所以不要期望太高:-)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main (string[] args)
{
var buffer = CreateBuffer ();
var executor = new Executor<string> (SomeWork, buffer);
executor.ProcessingStarted += Executor_ProcessingStarted;
string userInput = null;
do
{
userInput = Console.ReadLine ();
buffer.Enqueue (userInput);
}
while (!string.IsNullOrWhiteSpace (userInput));
executor.Dispose ();
}
//----------------------------------------------------------------------------------------------------------------------------------
private static IBuffer<string> CreateBuffer ()
{
var buffer = new UniqueItemsBuffer<string> (3);
buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");
var alert = new Alert ();
var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));
return bufferWithTimeout;
}
//----------------------------------------------------------------------------------------------------------------------------------
static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks
public static bool SomeWork (string x)
{
int delay = rnd.Next (1000, 8000);
Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms");
Thread.Sleep (delay);
Console.WriteLine ($" --- SomeWork for: {x} - finished.");
return true;
}
//----------------------------------------------------------------------------------------------------------------------------------
private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
{
Task.Run (() =>
{
Task.WaitAll (items.ToArray ());
Console.WriteLine ("Finished processing tasks, count = " + items.Count);
});
}
}
//====== actual code ===================================================================================================================
public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code
public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success
//======================================================================================================================================
public interface IDataAvailableEvent<T>
{
event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)
}
//======================================================================================================================================
public interface IProcessingStartedEvent<T>
{
event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started
}
//======================================================================================================================================
public interface IBuffer<T> : IDataAvailableEvent<T>
{
bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
// returns: true = buffer is not empty, false = is emtpy
void FlushBuffer (); // should clear buffer and raise event (or not raise if buffer was already empty)
}
//======================================================================================================================================
// raises DataAvailable event when buffer cap is reached
// ignores duplicates
// you can only use this class from one thread
public class UniqueItemsBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly int capacity;
HashSet<T> items = new HashSet<T> ();
public UniqueItemsBuffer (int capacity = 10)
{
this.capacity = capacity;
}
public bool Enqueue (T item)
{
if (items.Add (item) && items.Count == capacity)
{
FlushBuffer ();
}
return items.Count > 0;
}
public void FlushBuffer ()
{
Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);
if (items.Count > 0)
{
var itemsCopy = items.ToList ();
items.Clear ();
DataAvailable?.Invoke (itemsCopy);
}
}
}
//======================================================================================================================================
public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
{
public event ItemsAvailable<Task<bool>> ProcessingStarted;
readonly ProcessItem<T> work;
readonly IDataAvailableEvent<T> dataEvent;
public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
{
this.work = work;
this.dataEvent = dataEvent;
dataEvent.DataAvailable += DataEvent_DataAvailable;
}
private void DataEvent_DataAvailable (IReadOnlyList<T> items)
{
Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);
var list = new List<Task<bool>> ();
foreach (var item in items)
{
var task = Task.Run (() => work (item));
list.Add (task);
}
Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");
ProcessingStarted?.Invoke (list);
}
public void Dispose ()
{
dataEvent.DataAvailable -= DataEvent_DataAvailable;
}
}
//======================================================================================================================================
// if you want to fill buffer using many threads - use this decorator
public sealed class ThreadSafeBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly object sync = new object ();
private ThreadSafeBuffer (IBuffer<T> target)
{
this.target = target;
this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P
}
public bool Enqueue (T item)
{
lock (sync) return target.Enqueue (item);
}
public void FlushBuffer ()
{
lock (sync) target.FlushBuffer ();
}
public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
{
if (target is ThreadSafeBuffer<T>) return target;
return new ThreadSafeBuffer<T> (target);
}
}
//======================================================================================================================================
// and now if you want to process buffer after elapsed time
public interface IAlert
{
CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)
}
// I didn't use much timers, so idk is this code good
public class Alert : IAlert
{
List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose
public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
{
var cts = new CancellationTokenSource ();
var timer = new System.Timers.Timer (delay.TotalMilliseconds);
timers.Add (timer);
timer.Elapsed += (sender, e) =>
{
timers.Remove (timer);
timer.Dispose ();
if (cts.Token.IsCancellationRequested) return;
action.Invoke ();
};
timer.AutoReset = false; // just one tick
timer.Enabled = true;
return cts;
}
}
// thread safe (maybe :-D)
public class BufferWithTimeout<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly IAlert alert;
readonly TimeSpan timeout;
CancellationTokenSource cts;
readonly object sync = new object ();
public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
{
this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
this.alert = alert;
this.timeout = timeout;
target.DataAvailable += Target_DataAvailable; // TODO: unpin event
}
private void Target_DataAvailable (IReadOnlyList<T> items)
{
lock (sync)
{
DisableTimer ();
}
DataAvailable?.Invoke (items);
}
public bool Enqueue (T item)
{
lock (sync)
{
bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)
// and now if buffer is empty, we cannot start timer
if (hasItems && cts == null) // if timer is not enabled
{
Console.WriteLine ("TIMER :: created alert");
cts = alert.CreateAlert (timeout, HandleAlert);
}
return hasItems;
}
}
public void FlushBuffer ()
{
lock (sync)
{
DisableTimer ();
target.FlushBuffer ();
}
}
private void HandleAlert ()
{
lock (sync)
{
Console.WriteLine ("TIMER :: handler, will call buffer flush");
target.FlushBuffer ();
}
}
private void DisableTimer ()
{
cts?.Cancel ();
cts = null;
Console.WriteLine ("TIMER :: disable");
}
}
}