处理合并的内部订阅
Dispose inner subscription of merge
!!警告:Rx 新手!!
我们有多个喂价。要求是订阅所有这些提要,并且每 1 秒只输出最新的报价(油门)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
代码有多个问题。如果我多次使用相同的提要调用 AddToCombinedFeed,流将从一开始就被复制。例如。以下
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
这让我想到了这个问题。如果我想从合并流中删除一个价格流,我该怎么做?
更新 - 新解决方案
使用 RolandPheasant 的 Dynamic-Data(麻省理工学院许可证)和 Nuget。
- 使用 SourceList 而不是 List
- 使用 MergeMany 运算符
代码:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly SourceList<IObservable<PriceTick>> _feeds = new SourceList<IObservable<PriceTick>>();
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
var combinedPriceFeed = _feeds.Connect().MergeMany(x => x).Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed) => _feeds.Add(feed);
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
旧解决方案
- 通过应用 Switch() 技术消除重新订阅的需要。
您的 _combinedPriceFeed 只是切换到下一个可观察到的
将由 _combinePriceFeedChange 提供。
- 保留一个列表来管理您的多个供稿。每当列表更改时创建新的可观察对象并通过 _combinePriceFeedChange 提供它。
- 你应该得到相应的删除方法的逻辑。
代码:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
这是您需要的代码:
private static SerialDisposable _subscriber = new SerialDisposable();
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber.Disposable =
feed
.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency))
.SelectMany(buffer =>
buffer
.GroupBy(x => x.InstrumentId, (key, result) => result.First()))
.Subscribe(NotifyClient);
}
!!警告:Rx 新手!!
我们有多个喂价。要求是订阅所有这些提要,并且每 1 秒只输出最新的报价(油门)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
代码有多个问题。如果我多次使用相同的提要调用 AddToCombinedFeed,流将从一开始就被复制。例如。以下
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
这让我想到了这个问题。如果我想从合并流中删除一个价格流,我该怎么做?
更新 - 新解决方案
使用 RolandPheasant 的 Dynamic-Data(麻省理工学院许可证)和 Nuget。
- 使用 SourceList 而不是 List
- 使用 MergeMany 运算符
代码:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly SourceList<IObservable<PriceTick>> _feeds = new SourceList<IObservable<PriceTick>>();
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
var combinedPriceFeed = _feeds.Connect().MergeMany(x => x).Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed) => _feeds.Add(feed);
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
旧解决方案
- 通过应用 Switch() 技术消除重新订阅的需要。
您的 _combinedPriceFeed 只是切换到下一个可观察到的
将由 _combinePriceFeedChange 提供。
- 保留一个列表来管理您的多个供稿。每当列表更改时创建新的可观察对象并通过 _combinePriceFeedChange 提供它。
- 你应该得到相应的删除方法的逻辑。
代码:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
这是您需要的代码:
private static SerialDisposable _subscriber = new SerialDisposable();
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber.Disposable =
feed
.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency))
.SelectMany(buffer =>
buffer
.GroupBy(x => x.InstrumentId, (key, result) => result.First()))
.Subscribe(NotifyClient);
}