如何处理只需要数据库 C# 中符号的 "last" 值的高速数据流
How to process a high speed stream of data that only requires the "last" value for a symbol in a database C#
我有来自供应商的高速股票价格流...可能每秒 5000。 (大约 8000 个不同的符号)
我的数据库中有一个 table (SymbolPrice) 需要更新为最新的最后价格。
我似乎无法使数据库更新速度足够快,无法处理最新价格的队列。
我在 Azure Sql 服务器数据库上,所以我能够将数据库升级到支持内存中 tables 的高级版本,并使我的 SymbolPrice table内存中 table...但仍然不够好。
如果它最终跳过一个价格,这不是问题,只要最新的价格尽可能快地到达那里...所以如果我连续 10 个被炸毁...只有最后一个需要写...这听起来很简单,除了连续的 10 个可能与其他符号混合。
因此,我当前的解决方案是使用 ConcurrentDictionary 仅保存最新价格。并使用符号队列将更新推送到数据库(参见下面的代码)...但这仍然不够快。
解决这个问题的一种方法是简单地重复遍历整个字典...并用最新价格更新数据库...但这有点浪费,因为我也会这样做正在更新可能仅每隔几分钟更新一次的值,其更新速率与每秒更新多次的值相同。
有什么想法可以做得更好吗?
谢谢!
布莱恩
public ConcurrentDictionary<string, QuoddLastPriceCache.PriceData> _lastPrices = new ConcurrentDictionary<string, QuoddLastPriceCache.PriceData>();
public ConcurrentQueue<string> _lastPriceSymbolsToUpdate = new ConcurrentQueue<string>();
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
lastPriceCache.PriceReceived += (symbol, priceData) =>
{
_lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
_lastPriceSymbolsToUpdate.Enqueue(symbol);
};
}
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
if (_lastPriceSymbolsToUpdate.TryDequeue(out string symbol))
{
if (_lastPrices.TryRemove(symbol, out QuoddLastPriceCache.PriceData priceData))
{
// write to database
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
}
}
else
Thread.Sleep(1);
}
}
您需要使用能够查询流的工具,SQL 不是最适合它的工具。搜索复杂事件处理和 Kafka/事件中心 + 流分析。
我能做的最好的是以下方法...我将最后一个值保存在字典中并添加一个标志以表明它是否已写入数据库...然后传递数据并将更新的值写入数据库...这样我只更新最近更新的值。效果很好...不过似乎应该有更好的方法。
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
LastPriceCache.PriceReceived += (symbol, priceData) =>
{
_lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
};
}
public ConcurrentDictionary<string, PriceData> _lastPrices = new ConcurrentDictionary<string, PriceData>();
public bool _lastPriceUpdatesRunning;
public DateTime _lastScopeCreate = DateTime.MinValue;
public IServiceScope _lastPriceScope = null;
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
var processed = 0;
foreach (var symbol in _lastPrices.Keys)
{
if (_lastPrices.TryGetValue(symbol, out QuoddLastPriceCache.PriceData priceData))
{
if (priceData.WrittenToDatabase == false)
{
// create a new scope every 5 minutes
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
// write to database
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
priceData.WrittenToDatabase = true;
processed++;
}
}
}
if (processed > 0)
Thread.Sleep(1);
else
Thread.Sleep(1000 * 1);
}
}
我有来自供应商的高速股票价格流...可能每秒 5000。 (大约 8000 个不同的符号)
我的数据库中有一个 table (SymbolPrice) 需要更新为最新的最后价格。
我似乎无法使数据库更新速度足够快,无法处理最新价格的队列。
我在 Azure Sql 服务器数据库上,所以我能够将数据库升级到支持内存中 tables 的高级版本,并使我的 SymbolPrice table内存中 table...但仍然不够好。
如果它最终跳过一个价格,这不是问题,只要最新的价格尽可能快地到达那里...所以如果我连续 10 个被炸毁...只有最后一个需要写...这听起来很简单,除了连续的 10 个可能与其他符号混合。
因此,我当前的解决方案是使用 ConcurrentDictionary 仅保存最新价格。并使用符号队列将更新推送到数据库(参见下面的代码)...但这仍然不够快。
解决这个问题的一种方法是简单地重复遍历整个字典...并用最新价格更新数据库...但这有点浪费,因为我也会这样做正在更新可能仅每隔几分钟更新一次的值,其更新速率与每秒更新多次的值相同。
有什么想法可以做得更好吗?
谢谢!
布莱恩
public ConcurrentDictionary<string, QuoddLastPriceCache.PriceData> _lastPrices = new ConcurrentDictionary<string, QuoddLastPriceCache.PriceData>(); public ConcurrentQueue<string> _lastPriceSymbolsToUpdate = new ConcurrentQueue<string>(); public void Start() { Task.Run(() => { UpdateLastPricesTask(services); }); lastPriceCache.PriceReceived += (symbol, priceData) => { _lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; }); _lastPriceSymbolsToUpdate.Enqueue(symbol); }; } private void UpdateLastPricesTask(IServiceProvider services) { _lastPriceUpdatesRunning = true; while (_lastPriceUpdatesRunning) { if (_lastPriceSymbolsToUpdate.TryDequeue(out string symbol)) { if (_lastPrices.TryRemove(symbol, out QuoddLastPriceCache.PriceData priceData)) { // write to database if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow) { if (_lastPriceScope != null) _lastPriceScope.Dispose(); _lastPriceScope = services.CreateScope(); } var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>(); unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp); } } else Thread.Sleep(1); } }
您需要使用能够查询流的工具,SQL 不是最适合它的工具。搜索复杂事件处理和 Kafka/事件中心 + 流分析。
我能做的最好的是以下方法...我将最后一个值保存在字典中并添加一个标志以表明它是否已写入数据库...然后传递数据并将更新的值写入数据库...这样我只更新最近更新的值。效果很好...不过似乎应该有更好的方法。
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
LastPriceCache.PriceReceived += (symbol, priceData) =>
{
_lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
};
}
public ConcurrentDictionary<string, PriceData> _lastPrices = new ConcurrentDictionary<string, PriceData>();
public bool _lastPriceUpdatesRunning;
public DateTime _lastScopeCreate = DateTime.MinValue;
public IServiceScope _lastPriceScope = null;
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
var processed = 0;
foreach (var symbol in _lastPrices.Keys)
{
if (_lastPrices.TryGetValue(symbol, out QuoddLastPriceCache.PriceData priceData))
{
if (priceData.WrittenToDatabase == false)
{
// create a new scope every 5 minutes
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
// write to database
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
priceData.WrittenToDatabase = true;
processed++;
}
}
}
if (processed > 0)
Thread.Sleep(1);
else
Thread.Sleep(1000 * 1);
}
}