批量编辑动态数据时如何产生单次更新

How to produce single updates when batch editing in dynamic data

首先让我说一下,我对 Reactive Extensions 和 DynamicData 都是新手,所以我可能在这里遗漏了一些非常明显的东西。

目标: 我想根据新的市场价格计算一些交易的利润(或损失)(类似于:https://dynamic-data.org/2014/11/22/trading-example-part-2-manage-market-data/)。 为简单起见,假设最后一秒收到的所有价格都被视为新价格。

我的问题:即使使用批量编辑,1 秒内相同货币(例如 EURUSD)也会发生多个更新事件。理想情况下,我只想根据最新的可用值引发 1 个事件,以避免进行不必要的计算。

到目前为止我的代码:

Main.cs

using System.Reactive.Linq;
using DynamicData;


TickService tickService = new();

tickService.NewTicks
    .Connect()
    .Watch("EURUSD")
    .Subscribe(price => Console.WriteLine(price));

/*
 * In the actual project the prices come from an external system,
 * but that is irrelevant here 
 * so for the sake of simplicity I'm adding them manually (with a delay)
 * to TickService
 */
Task.Run(() =>
{
    for (int i = 0; i < 10; i++)
    {
        var newTicks = new List<Tick>() { new Tick() { Name = "EURUSD", Price = i, LastUpdate = DateTime.UtcNow } };
        tickService.AddTicks(newTicks);
        Console.WriteLine($"Added: i -> {i}");
        Thread.Sleep(250);
    }
});

Console.ReadLine();

Tick.cs

public class Tick
{
    public string Name { get; set; } = "";
    public double Price { get; set; }
    public DateTime LastUpdate { get; set; }

    public override string ToString()
    {
        return $"Name: {Name}, Price: {Price}, LastUpdate: {LastUpdate}";
    }
}

TickService.cs(使用最后一秒的价格公开 IObservableCache)

using DynamicData;

public class TickService
{
    private readonly SourceCache<Tick, string> _ticksCache = new(x => x.Name);
    public IObservableCache<Tick, string> NewTicks { get; }

    public TickService()
    {
        NewTicks = _ticksCache
             .Connect()
             .Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
             .Batch(TimeSpan.FromSeconds(1))
             .AsObservableCache();
    }

    public void AddTicks(ICollection<Tick> newTicks)
    {
        _ticksCache.Edit(innerCache => innerCache.AddOrUpdate(newTicks));
    }
}

以上代码产生以下结果:

Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Add, Key: EURUSD, Current: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm, Previous: <None>
Update, Key: EURUSD, Current: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm, Previous: Name: EURUSD, Price: 0, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 1, LastUpdate: 14/03/2022 4:14:20 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 4, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 5, LastUpdate: 14/03/2022 4:14:21 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm

我想要的是:

Added: i -> 0
Added: i -> 1
Added: i -> 2
Added: i -> 3
Update, Key: EURUSD, Current: Name: EURUSD, Price: 3, LastUpdate: 14/03/2022 4:14:21 pm, Previous: Name: EURUSD, Price: 2, LastUpdate: 14/03/2022 4:14:21 pm
Added: i -> 4
Added: i -> 5
Added: i -> 6
Added: i -> 7
Update, Key: EURUSD, Current: Name: EURUSD, Price: 7, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 6, LastUpdate: 14/03/2022 4:14:22 pm
Added: i -> 8
Added: i -> 9
Update, Key: EURUSD, Current: Name: EURUSD, Price: 9, LastUpdate: 14/03/2022 4:14:22 pm, Previous: Name: EURUSD, Price: 8, LastUpdate: 14/03/2022 4:14:22 pm

由于我使用的是批量编辑,因此我希望每秒为 EURUSD 生成一个更新事件。 ideas/advice 关于我在这里遗漏了什么?

谢谢

这是几个人长期提出的要求,但从未具体化为运算符,因此我向动态数据添加了一个新运算符,以在更改集通知中实现唯一性。

新运算符被称为EnsureUniqueKeys,可以应用于缓存

    var uniqueChanges = _myCache.Connect().EnsureUniqueKeys()

哪里

    _myCache.Edit(innerCache =>
    {
        innerCache.AddOrUpdate(new Person("Me", 20));
        innerCache.AddOrUpdate(new Person("Me", 21));
        innerCache.AddOrUpdate(new Person("Me", 22));
    });

将为 Person("Me", 22)

生成一个添加通知

  _myCache.Edit(innerCache =>
    {
        innerCache.AddOrUpdate(new Person("Me", 20));
        innerCache.AddOrUpdate(new Person("Me", 21));
        innerCache.RemoveKey("Me");
    });

将产生一个空的更改集,因为在同一编辑中添加、更新和删除了项目。

运营商可以在TickService中这样申请:

 NewTicks = _ticksCache
        .Connect()
        .Filter(tick => tick.LastUpdate > DateTime.UtcNow - TimeSpan.FromSeconds(1))
        .Batch(TimeSpan.FromSeconds(1))
        .EnsureUniqueKeys()      
        .AsObservableCache();