多个字段的 C# 不可变计数器
C# Immutable counter for multiple fields
我在消息计数器上有相当高的吞吐量(每秒数万条),我正在寻找一种有效的方法来获取计数,而不用在任何地方都加锁,或者理想情况下,当我给出一个消息时不锁定每条消息计数每 10 秒更新一次。
不可变计数器对象的使用
我正在使用不可变计数器class:
public class Counter
{
public Counter(int quotes, int trades)
{
Quotes = quotes;
Trades = trades;
}
readonly public int Quotes;
readonly public int Trades;
// and some other counter fields snipped
}
并且会在每个消息处理循环中更新它:
class MyProcessor
{
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
this.Counter = new Counter(0,0);
});
}
public void ProcessMessages(Messages messages)
{
foreach(var message in messages) { /* */ }
var oldCounter = counter;
this.counter = new Counter(oldCounter.Quotes, oldCounter.Trades);
}
}
我有很多计数器(未全部显示),所以这意味着在各个计数器字段上有很多单独的 Interlocked.Increment
调用。
我能想到的唯一其他方法是锁定 ProcessMessages
中的每个 运行(这将是广泛的)并且对于一些实用的东西来说很重,而不是关键的程序会崩溃。
当我们只需要每 10 秒更新一次时,是否可以在没有硬 interlocking/thread 机制的情况下以这种方式使用不可变计数器对象?
标记检查想法以避免锁定
计时器线程是否可以为 ProcessMessages
设置一个标志以进行检查,如果它看到它已设置,则再次从零开始计数,即
/* snipped the MyProcessor class, same as before */
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
ManualResetEvent reset = new ManualResetEvent(false);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
// log
this.reset.Set();
});
}
// this should be called every second with a heartbeat message posted to queue
public void ProcessMessages(Messages messages)
{
if (reset.WaitOne(0) == true)
{
this.counter = new Counter(this.counter.Quotes, this.counter.Trades, this.counter.Aggregates);
reset.Reset();
}
else
{
this.counter = new Counter(
this.counter.Quotes + message.Quotes.Count,
this.counter.Trades + message.Trades.Count);
}
}
/* end of MyProcessor class */
这可行,但是更新 "stalls" 当进程消息停止时(虽然吞吐量非常高,但它确实会在晚上暂停几个小时,理想情况下应该显示实际而不是最后一个值)。
解决此问题的一种方法是 post 每秒向 MyProcessor.ProcessMessages()
发送一条心跳消息,以强制内部更新消息计数器并在 reset
ManualResetEvent 为设置。
以下是 Counter
class 的三种新方法。一种用于从特定位置读取最新值,一种用于安全地更新特定位置,一种用于根据现有值轻松创建新的 Counter
:
public static Counter Read(ref Counter counter)
{
return Interlocked.CompareExchange(ref counter, null, null);
}
public static void Update(ref Counter counter, Func<Counter, Counter> updateFactory)
{
var counter1 = counter;
while (true)
{
var newCounter = updateFactory(counter1);
var counter2 = Interlocked.CompareExchange(ref counter, newCounter, counter1);
if (counter2 == counter1) break;
counter1 = counter2;
}
}
public Counter Add(int quotesDelta, int tradesDelta)
{
return new Counter(Quotes + quotesDelta, Trades + tradesDelta);
}
用法示例:
Counter latest = Counter.Read(ref this.counter);
Counter.Update(ref this.counter, existing => existing.Add(1, 1));
多个线程并发直接访问MyProcessor.counter
字段不是线程安全的,因为它既不是volatile
nor protected by a lock
. The above methods are safe to use because they are accessing the field through interlocked操作。
我想用我的想法来更新每个人,计数器更新是在线程本身内推送的。
一切都由 DequeueThread
循环驱动,特别是 this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds))
函数。
这将从队列中 return 一个项目,处理它并更新计数器,或者超时然后更新计数器 - 没有其他线程涉及所有事情,包括更新消息速率,在内部完成线程。
总而言之,没有什么是并行运行的(就数据包出队而言),它是一次获取一个项目并处理它,然后处理计数器。然后最后循环处理队列中的下一个项目。
这消除了同步的需要:
internal class Counter
{
public Counter(Action<int,int,int,int> updateCallback, double updateEvery)
{
this.updateCallback = updateCallback;
this.UpdateEvery = updateEvery;
}
public void Poll()
{
if (nextUpdate < DateTimeOffset.UtcNow)
{
// post the stats, and reset
this.updateCallback(this.quotes, this.trades, this.aggregates, this.statuses);
this.quotes = 0;
this.trades = 0;
this.aggregates = 0;
this.statuses = 0;
nextUpdate = DateTimeOffset.UtcNow.AddSeconds(this.UpdateEvery);
}
}
public void AddQuotes(int count) => this.quotes += count;
public void AddTrades(int count) => this.trades += count;
public void AddAggregates(int count) => this.aggregates += count;
public void AddStatuses(int count) => this.statuses += count;
private int quotes;
private int trades;
private int aggregates;
private int statuses;
private readonly Action<int,int,int,int> updateCallback;
public double UpdateEvery { get; private set; }
private DateTimeOffset nextUpdate;
}
public class DeserializeWorker
{
private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
private readonly IPolygonDeserializer polygonDeserializer;
private readonly ILogger<DeserializeWorker> logger;
private readonly Counter counter;
const double UpdateFrequencySeconds = 5.0;
long maxBacklog = 0;
public DeserializeWorker(IPolygonDeserializer polygonDeserializer, ILogger<DeserializeWorker> logger)
{
this.polygonDeserializer = polygonDeserializer ?? throw new ArgumentNullException(nameof(polygonDeserializer));
this.logger = logger;
this.counter = new Counter(ProcesCounterUpdateCallback, UpdateFrequencySeconds);
}
public void Add(byte[] data)
{
this.queue.Post(data);
}
public Task Run(CancellationToken stoppingToken)
{
return Task
.Factory
.StartNew(
async () => await DequeueThread(stoppingToken),
stoppingToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
}
private async Task DequeueThread(CancellationToken stoppingToken)
{
while (stoppingToken.IsCancellationRequested == false)
{
try
{
var item = await this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds), stoppingToken);
await ProcessAsync(item);
}
catch (TimeoutException)
{
// this is ok, timeout expired
}
catch(TaskCanceledException)
{
break; // task cancelled, break from loop
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
UpdateCounters();
}
await StopAsync();
}
protected async Task StopAsync()
{
this.queue.Complete();
await this.queue.Completion;
}
protected void ProcessStatuses(IEnumerable<Status> statuses)
{
Parallel.ForEach(statuses, (current) =>
{
if (current.Result != "success")
this.logger.LogInformation($"{current.Result}: {current.Message}");
});
}
protected void ProcessMessages<T>(IEnumerable<T> messages)
{
Parallel.ForEach(messages, (current) =>
{
// serialize by type T
// dispatch
});
}
async Task ProcessAsync(byte[] item)
{
try
{
var memoryStream = new MemoryStream(item);
var message = await this.polygonDeserializer.DeserializeAsync(memoryStream);
var messagesTask = Task.Run(() => ProcessStatuses(message.Statuses));
var quotesTask = Task.Run(() => ProcessMessages(message.Quotes));
var tradesTask = Task.Run(() => ProcessMessages(message.Trades));
var aggregatesTask = Task.Run(() => ProcessMessages(message.Aggregates));
this.counter.AddStatuses(message.Statuses.Count);
this.counter.AddQuotes(message.Quotes.Count);
this.counter.AddTrades(message.Trades.Count);
this.counter.AddAggregates(message.Aggregates.Count);
Task.WaitAll(messagesTask, quotesTask, aggregatesTask, tradesTask);
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
}
void UpdateCounters()
{
var currentCount = this.queue.Count;
if (currentCount > this.maxBacklog)
this.maxBacklog = currentCount;
this.counter.Poll();
}
void ProcesCounterUpdateCallback(int quotes, int trades, int aggregates, int statuses)
{
var updateFrequency = this.counter.UpdateEvery;
logger.LogInformation(
$"Queue current {this.queue.Count} (max {this.maxBacklog }), {quotes / updateFrequency} quotes/sec, {trades / updateFrequency} trades/sec, {aggregates / updateFrequency} aggregates/sec, {statuses / updateFrequency} status/sec");
}
}
我在消息计数器上有相当高的吞吐量(每秒数万条),我正在寻找一种有效的方法来获取计数,而不用在任何地方都加锁,或者理想情况下,当我给出一个消息时不锁定每条消息计数每 10 秒更新一次。
不可变计数器对象的使用
我正在使用不可变计数器class:
public class Counter
{
public Counter(int quotes, int trades)
{
Quotes = quotes;
Trades = trades;
}
readonly public int Quotes;
readonly public int Trades;
// and some other counter fields snipped
}
并且会在每个消息处理循环中更新它:
class MyProcessor
{
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
this.Counter = new Counter(0,0);
});
}
public void ProcessMessages(Messages messages)
{
foreach(var message in messages) { /* */ }
var oldCounter = counter;
this.counter = new Counter(oldCounter.Quotes, oldCounter.Trades);
}
}
我有很多计数器(未全部显示),所以这意味着在各个计数器字段上有很多单独的 Interlocked.Increment
调用。
我能想到的唯一其他方法是锁定 ProcessMessages
中的每个 运行(这将是广泛的)并且对于一些实用的东西来说很重,而不是关键的程序会崩溃。
当我们只需要每 10 秒更新一次时,是否可以在没有硬 interlocking/thread 机制的情况下以这种方式使用不可变计数器对象?
标记检查想法以避免锁定
计时器线程是否可以为 ProcessMessages
设置一个标志以进行检查,如果它看到它已设置,则再次从零开始计数,即
/* snipped the MyProcessor class, same as before */
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
ManualResetEvent reset = new ManualResetEvent(false);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
// log
this.reset.Set();
});
}
// this should be called every second with a heartbeat message posted to queue
public void ProcessMessages(Messages messages)
{
if (reset.WaitOne(0) == true)
{
this.counter = new Counter(this.counter.Quotes, this.counter.Trades, this.counter.Aggregates);
reset.Reset();
}
else
{
this.counter = new Counter(
this.counter.Quotes + message.Quotes.Count,
this.counter.Trades + message.Trades.Count);
}
}
/* end of MyProcessor class */
这可行,但是更新 "stalls" 当进程消息停止时(虽然吞吐量非常高,但它确实会在晚上暂停几个小时,理想情况下应该显示实际而不是最后一个值)。
解决此问题的一种方法是 post 每秒向 MyProcessor.ProcessMessages()
发送一条心跳消息,以强制内部更新消息计数器并在 reset
ManualResetEvent 为设置。
以下是 Counter
class 的三种新方法。一种用于从特定位置读取最新值,一种用于安全地更新特定位置,一种用于根据现有值轻松创建新的 Counter
:
public static Counter Read(ref Counter counter)
{
return Interlocked.CompareExchange(ref counter, null, null);
}
public static void Update(ref Counter counter, Func<Counter, Counter> updateFactory)
{
var counter1 = counter;
while (true)
{
var newCounter = updateFactory(counter1);
var counter2 = Interlocked.CompareExchange(ref counter, newCounter, counter1);
if (counter2 == counter1) break;
counter1 = counter2;
}
}
public Counter Add(int quotesDelta, int tradesDelta)
{
return new Counter(Quotes + quotesDelta, Trades + tradesDelta);
}
用法示例:
Counter latest = Counter.Read(ref this.counter);
Counter.Update(ref this.counter, existing => existing.Add(1, 1));
多个线程并发直接访问MyProcessor.counter
字段不是线程安全的,因为它既不是volatile
nor protected by a lock
. The above methods are safe to use because they are accessing the field through interlocked操作。
我想用我的想法来更新每个人,计数器更新是在线程本身内推送的。
一切都由 DequeueThread
循环驱动,特别是 this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds))
函数。
这将从队列中 return 一个项目,处理它并更新计数器,或者超时然后更新计数器 - 没有其他线程涉及所有事情,包括更新消息速率,在内部完成线程。
总而言之,没有什么是并行运行的(就数据包出队而言),它是一次获取一个项目并处理它,然后处理计数器。然后最后循环处理队列中的下一个项目。
这消除了同步的需要:
internal class Counter
{
public Counter(Action<int,int,int,int> updateCallback, double updateEvery)
{
this.updateCallback = updateCallback;
this.UpdateEvery = updateEvery;
}
public void Poll()
{
if (nextUpdate < DateTimeOffset.UtcNow)
{
// post the stats, and reset
this.updateCallback(this.quotes, this.trades, this.aggregates, this.statuses);
this.quotes = 0;
this.trades = 0;
this.aggregates = 0;
this.statuses = 0;
nextUpdate = DateTimeOffset.UtcNow.AddSeconds(this.UpdateEvery);
}
}
public void AddQuotes(int count) => this.quotes += count;
public void AddTrades(int count) => this.trades += count;
public void AddAggregates(int count) => this.aggregates += count;
public void AddStatuses(int count) => this.statuses += count;
private int quotes;
private int trades;
private int aggregates;
private int statuses;
private readonly Action<int,int,int,int> updateCallback;
public double UpdateEvery { get; private set; }
private DateTimeOffset nextUpdate;
}
public class DeserializeWorker
{
private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
private readonly IPolygonDeserializer polygonDeserializer;
private readonly ILogger<DeserializeWorker> logger;
private readonly Counter counter;
const double UpdateFrequencySeconds = 5.0;
long maxBacklog = 0;
public DeserializeWorker(IPolygonDeserializer polygonDeserializer, ILogger<DeserializeWorker> logger)
{
this.polygonDeserializer = polygonDeserializer ?? throw new ArgumentNullException(nameof(polygonDeserializer));
this.logger = logger;
this.counter = new Counter(ProcesCounterUpdateCallback, UpdateFrequencySeconds);
}
public void Add(byte[] data)
{
this.queue.Post(data);
}
public Task Run(CancellationToken stoppingToken)
{
return Task
.Factory
.StartNew(
async () => await DequeueThread(stoppingToken),
stoppingToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
}
private async Task DequeueThread(CancellationToken stoppingToken)
{
while (stoppingToken.IsCancellationRequested == false)
{
try
{
var item = await this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds), stoppingToken);
await ProcessAsync(item);
}
catch (TimeoutException)
{
// this is ok, timeout expired
}
catch(TaskCanceledException)
{
break; // task cancelled, break from loop
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
UpdateCounters();
}
await StopAsync();
}
protected async Task StopAsync()
{
this.queue.Complete();
await this.queue.Completion;
}
protected void ProcessStatuses(IEnumerable<Status> statuses)
{
Parallel.ForEach(statuses, (current) =>
{
if (current.Result != "success")
this.logger.LogInformation($"{current.Result}: {current.Message}");
});
}
protected void ProcessMessages<T>(IEnumerable<T> messages)
{
Parallel.ForEach(messages, (current) =>
{
// serialize by type T
// dispatch
});
}
async Task ProcessAsync(byte[] item)
{
try
{
var memoryStream = new MemoryStream(item);
var message = await this.polygonDeserializer.DeserializeAsync(memoryStream);
var messagesTask = Task.Run(() => ProcessStatuses(message.Statuses));
var quotesTask = Task.Run(() => ProcessMessages(message.Quotes));
var tradesTask = Task.Run(() => ProcessMessages(message.Trades));
var aggregatesTask = Task.Run(() => ProcessMessages(message.Aggregates));
this.counter.AddStatuses(message.Statuses.Count);
this.counter.AddQuotes(message.Quotes.Count);
this.counter.AddTrades(message.Trades.Count);
this.counter.AddAggregates(message.Aggregates.Count);
Task.WaitAll(messagesTask, quotesTask, aggregatesTask, tradesTask);
}
catch (Exception e)
{
this.logger.LogError(e.ToString());
}
}
void UpdateCounters()
{
var currentCount = this.queue.Count;
if (currentCount > this.maxBacklog)
this.maxBacklog = currentCount;
this.counter.Poll();
}
void ProcesCounterUpdateCallback(int quotes, int trades, int aggregates, int statuses)
{
var updateFrequency = this.counter.UpdateEvery;
logger.LogInformation(
$"Queue current {this.queue.Count} (max {this.maxBacklog }), {quotes / updateFrequency} quotes/sec, {trades / updateFrequency} trades/sec, {aggregates / updateFrequency} aggregates/sec, {statuses / updateFrequency} status/sec");
}
}