如何加入滑动windows

How to join in sliding windows

我有一系列股票代码和一系列股票价格。每次我得到一个股票代码(它保证是唯一的),我需要跨越 100 毫秒 window 并根据股票价格序列处理股票价格。如果在这 100 毫秒内序列中缺少股票价格,我需要处理没有价格的股票。

更好地可视化需求的大理石图:

Stock : -S1--S2--

Price : ---P1-P2-

Result: -S1---S2P2-

所以股票1进来了,跨越了100毫秒window,但是符号没有价格,因此结果应该是只存 1 (S1).

然后股票 2 进来,又过了 100 毫秒 window,股票 2 还没有价格。然而,在 100 毫秒 window 关闭之前,我们得到股票 2 的价格(P2),因此结果是股票 2 及其价格(S2P2) .

价格可能会随机出现,因此无法对价格顺序进行假设。

我看过这个 related SO question 但无法让它发挥作用。 我正在尝试使用 GroupJoin

stockSubject
.GroupJoin(
    stockPriceSubject,
    stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
    price => Observable.Never<Unit>(),
    (stock, stockPrice) =>
    {            
        var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
                                    .FirstOrDefaultAsync()
                                    .DefaultIfEmpty();
        return (Stock: stock, StockPrices: stockPrices);
    })
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));

这不起作用,因为它降低了一些价格(发生不确定性,所以无法真正找出问题所在)。

我尝试的另一种方法在工作时看起来并不理想

stockSubject
    .Subscribe(stock =>
    {
        stockPriceSubject
            .Buffer(TimeSpan.FromMilliseconds(100))
            .Take(1)
            .Subscribe(bufferedPrices =>
            {
                var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
                if (stockPrice == null)
                {
                    Console.WriteLine($"{stock} is w/o price");
                    return;
                }

                Console.WriteLine($"{stockPrice}");
            });
    });

有一件事我真的不喜欢这个,就是当我订阅缓冲价格时,每次有新股票时我都会放弃订阅。

知道什么是使用 Rx 实现此场景的最佳方法吗?

与股票和股价相关类

sealed class Stock : IEquatable<Stock>
{
    public Stock(string symbol)
    {
        Symbol = symbol;
    }

    public string Symbol { get; }

    public override string ToString() =>
        $"Stock[{Symbol}]";

    // IEquatable implementation is emitted for the sake of brevity
}

sealed class StockPrice
{
    public StockPrice(Stock stock, decimal price)
    {
        Stock = stock;
        Price = price;
    }

    public Stock Stock { get; }
    public decimal Price { get; }

    public override string ToString() =>
        $"{Stock} is traded @ {Price}";
}

编辑按要求添加测试数据代码生成器

每 10 毫秒将新股票推送到股票序列 (MSFT -> GOOG -> APPL).

每 20 毫秒将新价格推送到价格序列 (APPL -> GOOG)。

1 秒后股票价格 MSFT 被推送到价格序列。

预期输出:

一旦MSFT被推送到股票序列,100毫秒window开盘...100毫秒内MSFT无价格 被推到价格序列,因此 MSFT 股票应该在没有价格的情况下处理(在结果集中价格是 empty/null)

GOOG推入股票序列后,再次100毫秒window开仓,这次有GOOG[=91的价格=] 股票在 100 毫秒内,因此 GOOG 股票应该以价格 (15m) 处理。

最后 APPL - 这里的预期输出与 MSFT 相同...因为没有为 [=73 推送价格=]APPL 100 毫秒内window 因为它被推送到股票序列,所以应该不带价格处理。在这里,之前发布的 APPL 股价应该不会有任何影响。

var stockSubject = new Subject<Stock>();
var stockPriceSubject = new Subject<StockPrice>();

Observable
   .Interval(TimeSpan.FromMilliseconds(10))
   .Take(3)
   .Subscribe(_ =>
   {
       switch (_)
       {
           case 0:
               {
                   var stock = new Stock("MSFT");
                   stockSubject.OnNext(stock);
                   break;
               }
           case 1:
               {
                   var stock = new Stock("GOOG");
                   stockSubject.OnNext(stock);
                   break;
               }
           case 2:
               {
                   var stock = new Stock("APPL");
                   stockSubject.OnNext(stock);
                   break;
               }
       }
   });

Observable
    .Interval(TimeSpan.FromMilliseconds(20))
    .Take(3)
    .Subscribe(_ =>
    {
        switch (_)
        {
            case 0:
                {
                    var stockPrice = new StockPrice(new Stock("APPL"), 10m);
                    stockPriceSubject.OnNext(stockPrice);
                    break;
                }
            case 1:
                {
                    var stockPrice = new StockPrice(new Stock("GOOG"), 15m);
                    stockPriceSubject.OnNext(stockPrice);
                    break;
                }
        }
    });

Observable
    .Timer(TimeSpan.FromSeconds(1))
    .Subscribe(_ =>
    {
        var stockPrice = new StockPrice(new Stock("MSFT"), 20m);
        stockPriceSubject.OnNext(stockPrice);
    });

没有一些测试代码就无法测试您的答案。我也不确定您想对下游数据做什么。如果此答案不够充分,请使用该信息修改问题。

我认为您所问的解决方案非常简单:

stocks
    .Select(s => (Stock: s, StockPrices: prices
        .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
        .Where(p => p.Stock == s)
    ));

这将导致针对 prices 的多个订阅问题,因此可以通过以下方式解决:

prices.Publish(_prices => 
    stocks
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
            .Where(p => p.Stock == s)
        ))
    );

JoinGroupJoin 在股票价格为 0 的情况下效果不佳。我不会为你的场景推荐它。但是,如果您返回它,则应将 Observable.Never 更改为 Observable.EmptyNever 使价格 window 永远开放,因此可以将旧价格与新股票合并。


编辑:

下面是一些使用 Microsoft.Reactive.Testing:

的测试代码
TestScheduler ts = new TestScheduler();
var stockSource = ts.CreateHotObservable<Stock>(
    new Recorded<Notification<Stock>>(10.MsTicks(), Notification.CreateOnNext(new Stock("MSFT"))),
    new Recorded<Notification<Stock>>(20.MsTicks(), Notification.CreateOnNext(new Stock("GOOG"))),
    new Recorded<Notification<Stock>>(30.MsTicks(), Notification.CreateOnNext(new Stock("AAPL")))
);

var priceSource = ts.CreateHotObservable<StockPrice>(
    new Recorded<Notification<StockPrice>>(20.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("AAPL"), 10m))),
    new Recorded<Notification<StockPrice>>(40.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("GOOG"), 15m)))
);


var target = priceSource.Publish(_prices =>
    stockSource
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100), ts))
            .Where(p => p.Stock.Symbol == s.Symbol)
        ))
    );
var observer = ts.CreateObserver<(Stock, IObservable<StockPrice>)>();
target.Subscribe(observer);

var target2 = target.SelectMany(t => t.StockPrices.Select(sp => (Stock: t.Stock, Price: sp)));
var observer2 = ts.CreateObserver<(Stock, StockPrice)>();
target2.Subscribe(observer2);
ts.Start();

observer.Messages.Dump();   //LinqPad
observer2.Messages.Dump();  //LinqPad

并使用扩展方法:

public static class Extensions
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }
}

对我来说,这行得通。唯一的问题是 IEquatable 缺乏实施。所以我从 .Where(p => p.Stock == s) 切换到 .Where(p => p.Stock.Symbol == s.Symbol)。这可能是你的问题吗?