批量编辑动态数据时如何产生单次更新
How to produce single updates when batch editing in dynamic data
首先让我说一下,我对 Reactive Extensions 和 DynamicData 都是新手,所以我可能在这里遗漏了一些非常明显的东西。
目标:
我想根据新的市场价格计算一些交易的利润(或损失)(类似于:https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/)。
为简单起见,假设最后一秒收到的所有价格都被视为新价格。
我的问题:即使使用批量编辑,1 秒内相同货币(例如 EURUSD)也会发生多个更新事件。理想情况下,我只想根据最新的可用值引发 1 个事件,以避免进行不必要的计算。
到目前为止我的代码:
Main.cs
using System.Reactive.Linq;
using DynamicData;
TickService tickService = new();
tickService.NewTicks
.Connect()
.Watch("EURUSD")
.Subscribe(price => Console.WriteLine(price));
/*
* In the actual project the prices come from an external system,
* but that is irrelevant here
* so for the sake of simplicity I'm adding them manually (with a delay)
* to TickService
*/
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
tickService.AddTicks(newTicks);
Console.WriteLine($"Added: i -> {i}");
Thread.Sleep(250);
}
});
Console.ReadLine();
Tick.cs
public class Tick
{
public string Name { get; set; } = "";
public double Price { get; set; }
public DateTime LastUpdate { get; set; }
public override string ToString()
{
return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
}
}
TickService.cs(使用最后一秒的价格公开 IObservableCache)
using DynamicData;
public class TickService
{
private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
public IObservableCache<Tick, string> NewTicks { get; }
public TickService()
{
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.AsObservableCache();
}
public void AddTicks(ICollection<Tick> newTicks)
{
_ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
}
}
以上代码产生以下结果:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
我想要的是:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
由于我使用的是批量编辑,因此我希望每秒为 EURUSD 生成一个更新事件。 ideas/advice 关于我在这里遗漏了什么?
谢谢
这是几个人长期提出的要求,但从未具体化为运算符,因此我向动态数据添加了一个新运算符,以在更改集通知中实现唯一性。
新运算符被称为EnsureUniqueKeys
,可以应用于缓存
var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()
哪里
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.AddOrUpdate(new Person("Me", 22));
});
将为 Person("Me", 22)
生成一个添加通知
和
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.RemoveKey("Me");
});
将产生一个空的更改集,因为在同一编辑中添加、更新和删除了项目。
运营商可以在TickService中这样申请:
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.EnsureUniqueKeys()
.AsObservableCache();
首先让我说一下,我对 Reactive Extensions 和 DynamicData 都是新手,所以我可能在这里遗漏了一些非常明显的东西。
目标: 我想根据新的市场价格计算一些交易的利润(或损失)(类似于:https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/)。 为简单起见,假设最后一秒收到的所有价格都被视为新价格。
我的问题:即使使用批量编辑,1 秒内相同货币(例如 EURUSD)也会发生多个更新事件。理想情况下,我只想根据最新的可用值引发 1 个事件,以避免进行不必要的计算。
到目前为止我的代码:
Main.cs
using System.Reactive.Linq;
using DynamicData;
TickService tickService = new();
tickService.NewTicks
.Connect()
.Watch("EURUSD")
.Subscribe(price => Console.WriteLine(price));
/*
* In the actual project the prices come from an external system,
* but that is irrelevant here
* so for the sake of simplicity I'm adding them manually (with a delay)
* to TickService
*/
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
tickService.AddTicks(newTicks);
Console.WriteLine($"Added: i -> {i}");
Thread.Sleep(250);
}
});
Console.ReadLine();
Tick.cs
public class Tick
{
public string Name { get; set; } = "";
public double Price { get; set; }
public DateTime LastUpdate { get; set; }
public override string ToString()
{
return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
}
}
TickService.cs(使用最后一秒的价格公开 IObservableCache)
using DynamicData;
public class TickService
{
private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
public IObservableCache<Tick, string> NewTicks { get; }
public TickService()
{
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.AsObservableCache();
}
public void AddTicks(ICollection<Tick> newTicks)
{
_ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
}
}
以上代码产生以下结果:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
我想要的是:
Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm
由于我使用的是批量编辑,因此我希望每秒为 EURUSD 生成一个更新事件。 ideas/advice 关于我在这里遗漏了什么?
谢谢
这是几个人长期提出的要求,但从未具体化为运算符,因此我向动态数据添加了一个新运算符,以在更改集通知中实现唯一性。
新运算符被称为EnsureUniqueKeys
,可以应用于缓存
var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()
哪里
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.AddOrUpdate(new Person("Me", 22));
});
将为 Person("Me", 22)
生成一个添加通知和
_myCache.Edit(innerCache =>
{
innerCache.AddOrUpdate(new Person("Me", 20));
innerCache.AddOrUpdate(new Person("Me", 21));
innerCache.RemoveKey("Me");
});
将产生一个空的更改集,因为在同一编辑中添加、更新和删除了项目。
运营商可以在TickService中这样申请:
NewTicks = _ticksCache
.Connect()
.Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
.Batch(TimeSpan.FromSeconds(1))
.EnsureUniqueKeys()
.AsObservableCache();