如何加入滑动windows
How to join in sliding windows
我有一系列股票代码和一系列股票价格。每次我得到一个股票代码(它保证是唯一的),我需要跨越 100 毫秒 window 并根据股票价格序列处理股票价格。如果在这 100 毫秒内序列中缺少股票价格,我需要处理没有价格的股票。
更好地可视化需求的大理石图:
Stock : -S1--S2--
Price : ---P1-P2-
Result: -S1---S2P2-
所以股票1
进来了,跨越了100毫秒window,但是符号没有价格,因此结果应该是只存 1
(S1
).
然后股票 2
进来,又过了 100 毫秒 window,股票 2
还没有价格。然而,在 100 毫秒 window 关闭之前,我们得到股票 2
的价格(P2
),因此结果是股票 2
及其价格(S2P2
) .
价格可能会随机出现,因此无法对价格顺序进行假设。
我看过这个 related SO question 但无法让它发挥作用。
我正在尝试使用 GroupJoin
stockSubject
.GroupJoin(
stockPriceSubject,
stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
price => Observable.Never<Unit>(),
(stock, stockPrice) =>
{
var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
.FirstOrDefaultAsync()
.DefaultIfEmpty();
return (Stock: stock, StockPrices: stockPrices);
})
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));
这不起作用,因为它降低了一些价格(发生不确定性,所以无法真正找出问题所在)。
我尝试的另一种方法在工作时看起来并不理想
stockSubject
.Subscribe(stock =>
{
stockPriceSubject
.Buffer(TimeSpan.FromMilliseconds(100))
.Take(1)
.Subscribe(bufferedPrices =>
{
var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
if (stockPrice == null)
{
Console.WriteLine($"{stock} is w/o price");
return;
}
Console.WriteLine($"{stockPrice}");
});
});
有一件事我真的不喜欢这个,就是当我订阅缓冲价格时,每次有新股票时我都会放弃订阅。
知道什么是使用 Rx 实现此场景的最佳方法吗?
与股票和股价相关类
sealed class Stock : IEquatable<Stock>
{
public Stock(string symbol)
{
Symbol = symbol;
}
public string Symbol { get; }
public override string ToString() =>
$"Stock[{Symbol}]";
// IEquatable implementation is emitted for the sake of brevity
}
sealed class StockPrice
{
public StockPrice(Stock stock, decimal price)
{
Stock = stock;
Price = price;
}
public Stock Stock { get; }
public decimal Price { get; }
public override string ToString() =>
$"{Stock} is traded @ {Price}";
}
编辑按要求添加测试数据代码生成器
每 10 毫秒将新股票推送到股票序列 (MSFT -> GOOG -> APPL).
每 20 毫秒将新价格推送到价格序列 (APPL -> GOOG)。
1 秒后股票价格 MSFT 被推送到价格序列。
预期输出:
一旦MSFT被推送到股票序列,100毫秒window开盘...100毫秒内MSFT无价格 被推到价格序列,因此 MSFT 股票应该在没有价格的情况下处理(在结果集中价格是 empty/null)
GOOG推入股票序列后,再次100毫秒window开仓,这次有GOOG[=91的价格=] 股票在 100 毫秒内,因此 GOOG 股票应该以价格 (15m) 处理。
最后 APPL - 这里的预期输出与 MSFT 相同...因为没有为 [=73 推送价格=]APPL 100 毫秒内window 因为它被推送到股票序列,所以应该不带价格处理。在这里,之前发布的 APPL 股价应该不会有任何影响。
var stockSubject = new Subject<Stock>();
var stockPriceSubject = new Subject<StockPrice>();
Observable
.Interval(TimeSpan.FromMilliseconds(10))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stock = new Stock("MSFT");
stockSubject.OnNext(stock);
break;
}
case 1:
{
var stock = new Stock("GOOG");
stockSubject.OnNext(stock);
break;
}
case 2:
{
var stock = new Stock("APPL");
stockSubject.OnNext(stock);
break;
}
}
});
Observable
.Interval(TimeSpan.FromMilliseconds(20))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stockPrice = new StockPrice(new Stock("APPL"), 10m);
stockPriceSubject.OnNext(stockPrice);
break;
}
case 1:
{
var stockPrice = new StockPrice(new Stock("GOOG"), 15m);
stockPriceSubject.OnNext(stockPrice);
break;
}
}
});
Observable
.Timer(TimeSpan.FromSeconds(1))
.Subscribe(_ =>
{
var stockPrice = new StockPrice(new Stock("MSFT"), 20m);
stockPriceSubject.OnNext(stockPrice);
});
没有一些测试代码就无法测试您的答案。我也不确定您想对下游数据做什么。如果此答案不够充分,请使用该信息修改问题。
我认为您所问的解决方案非常简单:
stocks
.Select(s => (Stock: s, StockPrices: prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
.Where(p => p.Stock == s)
));
这将导致针对 prices
的多个订阅问题,因此可以通过以下方式解决:
prices.Publish(_prices =>
stocks
.Select(s => (Stock: s, StockPrices: _prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
.Where(p => p.Stock == s)
))
);
Join
和 GroupJoin
在股票价格为 0 的情况下效果不佳。我不会为你的场景推荐它。但是,如果您返回它,则应将 Observable.Never
更改为 Observable.Empty
。 Never
使价格 window 永远开放,因此可以将旧价格与新股票合并。
编辑:
下面是一些使用 Microsoft.Reactive.Testing
:
的测试代码
TestScheduler ts = new TestScheduler();
var stockSource = ts.CreateHotObservable<Stock>(
new Recorded<Notification<Stock>>(10.MsTicks(), Notification.CreateOnNext(new Stock("MSFT"))),
new Recorded<Notification<Stock>>(20.MsTicks(), Notification.CreateOnNext(new Stock("GOOG"))),
new Recorded<Notification<Stock>>(30.MsTicks(), Notification.CreateOnNext(new Stock("AAPL")))
);
var priceSource = ts.CreateHotObservable<StockPrice>(
new Recorded<Notification<StockPrice>>(20.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("AAPL"), 10m))),
new Recorded<Notification<StockPrice>>(40.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("GOOG"), 15m)))
);
var target = priceSource.Publish(_prices =>
stockSource
.Select(s => (Stock: s, StockPrices: _prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100), ts))
.Where(p => p.Stock.Symbol == s.Symbol)
))
);
var observer = ts.CreateObserver<(Stock, IObservable<StockPrice>)>();
target.Subscribe(observer);
var target2 = target.SelectMany(t => t.StockPrices.Select(sp => (Stock: t.Stock, Price: sp)));
var observer2 = ts.CreateObserver<(Stock, StockPrice)>();
target2.Subscribe(observer2);
ts.Start();
observer.Messages.Dump(); //LinqPad
observer2.Messages.Dump(); //LinqPad
并使用扩展方法:
public static class Extensions
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
对我来说,这行得通。唯一的问题是 IEquatable
缺乏实施。所以我从 .Where(p => p.Stock == s)
切换到 .Where(p => p.Stock.Symbol == s.Symbol)
。这可能是你的问题吗?
我有一系列股票代码和一系列股票价格。每次我得到一个股票代码(它保证是唯一的),我需要跨越 100 毫秒 window 并根据股票价格序列处理股票价格。如果在这 100 毫秒内序列中缺少股票价格,我需要处理没有价格的股票。
更好地可视化需求的大理石图:
Stock : -S1--S2--
Price : ---P1-P2-
Result: -S1---S2P2-
所以股票1
进来了,跨越了100毫秒window,但是符号没有价格,因此结果应该是只存 1
(S1
).
然后股票 2
进来,又过了 100 毫秒 window,股票 2
还没有价格。然而,在 100 毫秒 window 关闭之前,我们得到股票 2
的价格(P2
),因此结果是股票 2
及其价格(S2P2
) .
价格可能会随机出现,因此无法对价格顺序进行假设。
我看过这个 related SO question 但无法让它发挥作用。
我正在尝试使用 GroupJoin
stockSubject
.GroupJoin(
stockPriceSubject,
stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
price => Observable.Never<Unit>(),
(stock, stockPrice) =>
{
var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
.FirstOrDefaultAsync()
.DefaultIfEmpty();
return (Stock: stock, StockPrices: stockPrices);
})
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));
这不起作用,因为它降低了一些价格(发生不确定性,所以无法真正找出问题所在)。
我尝试的另一种方法在工作时看起来并不理想
stockSubject
.Subscribe(stock =>
{
stockPriceSubject
.Buffer(TimeSpan.FromMilliseconds(100))
.Take(1)
.Subscribe(bufferedPrices =>
{
var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
if (stockPrice == null)
{
Console.WriteLine($"{stock} is w/o price");
return;
}
Console.WriteLine($"{stockPrice}");
});
});
有一件事我真的不喜欢这个,就是当我订阅缓冲价格时,每次有新股票时我都会放弃订阅。
知道什么是使用 Rx 实现此场景的最佳方法吗?
与股票和股价相关类
sealed class Stock : IEquatable<Stock>
{
public Stock(string symbol)
{
Symbol = symbol;
}
public string Symbol { get; }
public override string ToString() =>
$"Stock[{Symbol}]";
// IEquatable implementation is emitted for the sake of brevity
}
sealed class StockPrice
{
public StockPrice(Stock stock, decimal price)
{
Stock = stock;
Price = price;
}
public Stock Stock { get; }
public decimal Price { get; }
public override string ToString() =>
$"{Stock} is traded @ {Price}";
}
编辑按要求添加测试数据代码生成器
每 10 毫秒将新股票推送到股票序列 (MSFT -> GOOG -> APPL).
每 20 毫秒将新价格推送到价格序列 (APPL -> GOOG)。
1 秒后股票价格 MSFT 被推送到价格序列。
预期输出:
一旦MSFT被推送到股票序列,100毫秒window开盘...100毫秒内MSFT无价格 被推到价格序列,因此 MSFT 股票应该在没有价格的情况下处理(在结果集中价格是 empty/null)
GOOG推入股票序列后,再次100毫秒window开仓,这次有GOOG[=91的价格=] 股票在 100 毫秒内,因此 GOOG 股票应该以价格 (15m) 处理。
最后 APPL - 这里的预期输出与 MSFT 相同...因为没有为 [=73 推送价格=]APPL 100 毫秒内window 因为它被推送到股票序列,所以应该不带价格处理。在这里,之前发布的 APPL 股价应该不会有任何影响。
var stockSubject = new Subject<Stock>();
var stockPriceSubject = new Subject<StockPrice>();
Observable
.Interval(TimeSpan.FromMilliseconds(10))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stock = new Stock("MSFT");
stockSubject.OnNext(stock);
break;
}
case 1:
{
var stock = new Stock("GOOG");
stockSubject.OnNext(stock);
break;
}
case 2:
{
var stock = new Stock("APPL");
stockSubject.OnNext(stock);
break;
}
}
});
Observable
.Interval(TimeSpan.FromMilliseconds(20))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stockPrice = new StockPrice(new Stock("APPL"), 10m);
stockPriceSubject.OnNext(stockPrice);
break;
}
case 1:
{
var stockPrice = new StockPrice(new Stock("GOOG"), 15m);
stockPriceSubject.OnNext(stockPrice);
break;
}
}
});
Observable
.Timer(TimeSpan.FromSeconds(1))
.Subscribe(_ =>
{
var stockPrice = new StockPrice(new Stock("MSFT"), 20m);
stockPriceSubject.OnNext(stockPrice);
});
没有一些测试代码就无法测试您的答案。我也不确定您想对下游数据做什么。如果此答案不够充分,请使用该信息修改问题。
我认为您所问的解决方案非常简单:
stocks
.Select(s => (Stock: s, StockPrices: prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
.Where(p => p.Stock == s)
));
这将导致针对 prices
的多个订阅问题,因此可以通过以下方式解决:
prices.Publish(_prices =>
stocks
.Select(s => (Stock: s, StockPrices: _prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
.Where(p => p.Stock == s)
))
);
Join
和 GroupJoin
在股票价格为 0 的情况下效果不佳。我不会为你的场景推荐它。但是,如果您返回它,则应将 Observable.Never
更改为 Observable.Empty
。 Never
使价格 window 永远开放,因此可以将旧价格与新股票合并。
编辑:
下面是一些使用 Microsoft.Reactive.Testing
:
TestScheduler ts = new TestScheduler();
var stockSource = ts.CreateHotObservable<Stock>(
new Recorded<Notification<Stock>>(10.MsTicks(), Notification.CreateOnNext(new Stock("MSFT"))),
new Recorded<Notification<Stock>>(20.MsTicks(), Notification.CreateOnNext(new Stock("GOOG"))),
new Recorded<Notification<Stock>>(30.MsTicks(), Notification.CreateOnNext(new Stock("AAPL")))
);
var priceSource = ts.CreateHotObservable<StockPrice>(
new Recorded<Notification<StockPrice>>(20.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("AAPL"), 10m))),
new Recorded<Notification<StockPrice>>(40.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("GOOG"), 15m)))
);
var target = priceSource.Publish(_prices =>
stockSource
.Select(s => (Stock: s, StockPrices: _prices
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100), ts))
.Where(p => p.Stock.Symbol == s.Symbol)
))
);
var observer = ts.CreateObserver<(Stock, IObservable<StockPrice>)>();
target.Subscribe(observer);
var target2 = target.SelectMany(t => t.StockPrices.Select(sp => (Stock: t.Stock, Price: sp)));
var observer2 = ts.CreateObserver<(Stock, StockPrice)>();
target2.Subscribe(observer2);
ts.Start();
observer.Messages.Dump(); //LinqPad
observer2.Messages.Dump(); //LinqPad
并使用扩展方法:
public static class Extensions
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
}
对我来说,这行得通。唯一的问题是 IEquatable
缺乏实施。所以我从 .Where(p => p.Stock == s)
切换到 .Where(p => p.Stock.Symbol == s.Symbol)
。这可能是你的问题吗?